1 11
/*!
2 11
 * Copyright 2018 Google Inc. All Rights Reserved.
3 11
 *
4 11
 * Licensed under the Apache License, Version 2.0 (the "License");
5 11
 * you may not use this file except in compliance with the License.
6 11
 * You may obtain a copy of the License at
7 11
 *
8 11
 *      http://www.apache.org/licenses/LICENSE-2.0
9 11
 *
10 11
 * Unless required by applicable law or agreed to in writing, software
11 11
 * distributed under the License is distributed on an "AS IS" BASIS,
12 11
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 11
 * See the License for the specific language governing permissions and
14 11
 * limitations under the License.
15 11
 */
16 11

17 11
import {PreciseDate} from '@google-cloud/precise-date';
18 11
import {promisifyAll} from '@google-cloud/promisify';
19 11
import * as extend from 'extend';
20 11
import * as is from 'is';
21 11
import {Snapshot} from './transaction';
22 11
import {google} from '../protos/protos';
23 11
import {Session} from '.';
24 11

25 11
export interface TransactionIdentifier {
26 11
  session: string | Session;
27 11
  transaction?: string;
28 11
  timestamp?: google.protobuf.ITimestamp;
29 11
}
30 11

31 11
/**
32 11
 * Use a BatchTransaction object to create partitions and read/query against
33 11
 * your Cloud Spanner database.
34 11
 *
35 11
 * @class
36 11
 * @extends Snapshot
37 11
 *
38 11
 * @param {TimestampBounds} [options] [Timestamp Bounds](https://cloud.google.com/spanner/docs/timestamp-bounds).
39 11
 */
40 11
class BatchTransaction extends Snapshot {
41 11
  /**
42 11
   * Closes all open resources.
43 11
   *
44 11
   * When the transaction is no longer needed, you should call this method to
45 11
   * free up resources allocated by the Batch client.
46 11
   *
47 11
   * Calling this method would render the transaction unusable everywhere. In
48 11
   * particular if this transaction object was being used across multiple
49 11
   * machines, calling this method on any of the machine would make the
50 11
   * transaction unusable on all the machines. This should only be called when
51 11
   * the transaction is no longer needed anywhere
52 11
   *
53 11
   * @param {BasicCallback} [callback] Callback function.
54 11
   * @returns {Promise<BasicResponse>}
55 11
   *
56 11
   * @example
57 11
   * const {Spanner} = require('@google-cloud/spanner');
58 11
   * const spanner = new Spanner();
59 11
   *
60 11
   * const instance = spanner.instance('my-instance');
61 11
   * const database = instance.database('my-database');
62 11
   *
63 11
   * database.createBatchTransaction(function(err, transaction) {
64 11
   *   if (err) {
65 11
   *     // Error handling omitted.
66 11
   *   }
67 11
   *
68 11
   *   transaction.close(function(err, apiResponse) {});
69 11
   * });
70 11
   *
71 11
   * //-
72 11
   * // If the callback is omitted, we'll return a Promise.
73 11
   * //-
74 11
   * database.createBatchTransaction().then(function(data) {
75 11
   *   const transaction = data[0];
76 11
   *   return transaction.close();
77 11
   * });
78 11
   */
79 11
  close(callback) {
80 11
    this.session.delete(callback);
81 11
  }
82 11
  /**
83 11
   * @see [`ExecuteSqlRequest`](https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.v1#google.spanner.v1.ExecuteSqlRequest)
84 11
   * @typedef {object} QueryPartition
85 11
   * @property {string} partitionToken The partition token.
86 11
   */
87 11
  /**
88 11
   * @typedef {array} CreateQueryPartitionsResponse
89 11
   * @property {QueryPartition[]} 0 List of query partitions.
90 11
   * @property {object} 1 The full API response.
91 11
   */
92 11
  /**
93 11
   * @callback CreateQueryPartitionsCallback
94 11
   * @param {?Error} err Request error, if any.
95 11
   * @param {QueryPartition[]} partitions List of query partitions.
96 11
   * @param {object} apiResponse The full API response.
97 11
   */
98 11
  /**
99 11
   * Creates a set of query partitions that can be used to execute a query
100 11
   * operation in parallel. Partitions become invalid when the transaction used
101 11
   * to create them is closed.
102 11
   *
103 11
   * @param {string|object} query A SQL query or
104 11
   *     [`ExecuteSqlRequest`](https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.v1#google.spanner.v1.ExecuteSqlRequest)
105 11
   *     object.
106 11
   * @param {object} [query.gaxOptions] Request configuration options, outlined
107 11
   *     here: https://googleapis.github.io/gax-nodejs/CallSettings.html.
108 11
   * @param {object} [query.params] A map of parameter name to values.
109 11
   * @param {object} [query.partitionOptions] A map of partition options.
110 11
   * @param {object} [query.types] A map of parameter types.
111 11
   * @param {CreateQueryPartitionsCallback} [callback] Callback callback function.
112 11
   * @returns {Promise<CreateQueryPartitionsResponse>}
113 11
   *
114 11
   * @example <caption>include:samples/batch.js</caption>
115 11
   * region_tag:spanner_batch_client
116 11
   */
117 11
  createQueryPartitions(query, callback) {
118 11
    if (is.string(query)) {
119 11
      query = {
120 11
        sql: query,
121 11
      };
122 11
    }
123 11

124 11
    const reqOpts = Object.assign({}, query, Snapshot.encodeParams(query));
125 11

126 11
    delete reqOpts.gaxOptions;
127 11
    delete reqOpts.types;
128 11

129 11
    this.createPartitions_(
130 11
      {
131 11
        client: 'SpannerClient',
132 11
        method: 'partitionQuery',
133 11
        reqOpts,
134 11
        gaxOpts: query.gaxOptions,
135 11
      },
136 11
      callback
137 11
    );
138 11
  }
139 11
  /**
140 11
   * Generic create partition method. Handles common parameters used in both
141 11
   * {@link BatchTransaction#createQueryPartitions} and {@link
142 11
   * BatchTransaction#createReadPartitions}
143 11
   *
144 11
   * @private
145 11
   *
146 11
   * @param {object} config The request config.
147 11
   * @param {function} callback Callback function.
148 11
   */
149 11
  createPartitions_(config, callback) {
150 11
    const query = extend({}, config.reqOpts, {
151 11
      session: this.session.formattedName_,
152 11
      transaction: {id: this.id},
153 11
    });
154 11
    config.reqOpts = extend({}, query);
155 11
    delete query.partitionOptions;
156 11
    this.session.request(config, (err, resp) => {
157 11
      if (err) {
158 11
        callback(err, null, resp);
159 11
        return;
160 11
      }
161 11

162 11
      const partitions = resp.partitions.map(partition => {
163 11
        return extend({}, query, partition);
164 11
      });
165 11

166 11
      if (resp.transaction) {
167 11
        const {id, readTimestamp} = resp.transaction;
168 11

169 11
        this.id = id;
170 11

171 11
        if (readTimestamp) {
172 11
          this.readTimestampProto = readTimestamp;
173 11
          this.readTimestamp = new PreciseDate(readTimestamp);
174 11
        }
175 11
      }
176 11

177 11
      callback(null, partitions, resp);
178 11
    });
179 11
  }
180 11
  /**
181 11
   * @typedef {object} ReadPartition
182 11
   * @mixes ReadRequestOptions
183 11
   * @property {string} partitionToken The partition token.
184 11
   * @property {object} [gaxOptions] Request configuration options, outlined
185 11
   *     here: https://googleapis.github.io/gax-nodejs/CallSettings.html.
186 11
   */
187 11
  /**
188 11
   * @typedef {array} CreateReadPartitionsResponse
189 11
   * @property {ReadPartition[]} 0 List of read partitions.
190 11
   * @property {object} 1 The full API response.
191 11
   */
192 11
  /**
193 11
   * @callback CreateReadPartitionsCallback
194 11
   * @param {?Error} err Request error, if any.
195 11
   * @param {ReadPartition[]} partitions List of read partitions.
196 11
   * @param {object} apiResponse The full API response.
197 11
   */
198 11
  /**
199 11
   * Creates a set of read partitions that can be used to execute a read
200 11
   * operation in parallel. Partitions become invalid when the transaction used
201 11
   * to create them is closed.
202 11
   *
203 11
   * @param {ReadRequestOptions} options Configuration object, describing what to
204 11
   *     read from.
205 11
   * @param {CreateReadPartitionsCallback} [callback] Callback function.
206 11
   * @returns {Promise<CreateReadPartitionsResponse>}
207 11
   */
208 11
  createReadPartitions(options, callback) {
209 11
    const reqOpts = Object.assign({}, options, {
210 11
      keySet: Snapshot.encodeKeySet(options),
211 11
    });
212 11

213 11
    delete reqOpts.gaxOptions;
214 11
    delete reqOpts.keys;
215 11
    delete reqOpts.ranges;
216 11

217 11
    this.createPartitions_(
218 11
      {
219 11
        client: 'SpannerClient',
220 11
        method: 'partitionRead',
221 11
        reqOpts,
222 11
        gaxOpts: options.gaxOptions,
223 11
      },
224 11
      callback
225 11
    );
226 11
  }
227 11
  /**
228 11
   * Executes partition.
229 11
   *
230 11
   * @see {@link Transaction#read} when using {@link ReadPartition}.
231 11
   * @see {@link Transaction#run} when using {@link QueryParition}.
232 11
   *
233 11
   * @param {ReadPartition|QueryParition} partition The partition object.
234 11
   * @param {object} [partition.gaxOptions] Request configuration options,
235 11
   *     outlined here:
236 11
   * https://googleapis.github.io/gax-nodejs/CallSettings.html.
237 11
   * @param {TransactionRequestReadCallback|RunCallback} [callback] Callback
238 11
   *     function.
239 11
   * @returns {Promise<RunResponse>|Promise<TransactionRequestReadResponse>}
240 11
   *
241 11
   * @example <caption>include:samples/batch.js</caption>
242 11
   * region_tag:spanner_batch_execute_partitions
243 11
   */
244 11
  execute(partition, callback) {
245 11
    if (is.string(partition.table)) {
246 11
      this.read(partition.table, partition, callback);
247 11
      return;
248 11
    }
249 11
    this.run(partition, callback);
250 11
  }
251 11
  /**
252 11
   * Executes partition in streaming mode.
253 11
   *
254 11
   * @see {@link Transaction#createReadStream} when using {@link ReadPartition}.
255 11
   * @see {@link Transaction#runStream} when using {@link QueryPartition}.
256 11
   *
257 11
   * @param {ReadPartition|QueryPartition} partition The partition object.
258 11
   * @returns {ReadableStream} A readable stream that emits rows.
259 11
   *
260 11
   * @example
261 11
   * const {Spanner} = require('@google-cloud/spanner');
262 11
   * const spanner = new Spanner();
263 11
   *
264 11
   * const instance = spanner.instance('my-instance');
265 11
   * const database = instance.database('my-database');
266 11
   *
267 11
   * database.createBatchTransaction(function(err, transaction) {
268 11
   *   if (err) {
269 11
   *     // Error handling omitted.
270 11
   *   }
271 11
   *
272 11
   *   transaction.createReadPartitions(options, function(err, partitions) {
273 11
   *     const partition = partitions[0];
274 11
   *
275 11
   *     transaction
276 11
   *       .executeStream(partition)
277 11
   *       .on('error', function(err) {})
278 11
   *       .on('data', function(row) {
279 11
   *         // row = [
280 11
   *         //   {
281 11
   *         //     name: 'SingerId',
282 11
   *         //     value: '1'
283 11
   *         //   },
284 11
   *         //   {
285 11
   *         //     name: 'Name',
286 11
   *         //     value: 'Eddie Wilson'
287 11
   *         //   }
288 11
   *         // ]
289 11
   *       })
290 11
   *       .on('end', function() {
291 11
   *         // All results retrieved
292 11
   *       });
293 11
   *   });
294 11
   * });
295 11
   */
296 11
  executeStream(partition) {
297 11
    if (is.string(partition.table)) {
298 11
      return this.createReadStream(partition.table, partition);
299 11
    }
300 11
    return this.runStream(partition);
301 11
  }
302 11
  /**
303 11
   * @typedef {object} TransactionIdentifier
304 11
   * @property {string} session The full session name.
305 11
   * @property {string} transaction The transaction ID.
306 11
   * @property {string|Date} readTimestamp The transaction read timestamp.
307 11
   */
308 11
  /**
309 11
   * Creates a transaction identifier used to reference the transaction in
310 11
   * workers.
311 11
   *
312 11
   * @returns {TransactionIdentifier}
313 11
   *
314 11
   * @example
315 11
   * const {Spanner} = require('@google-cloud/spanner');
316 11
   * const spanner = new Spanner();
317 11
   *
318 11
   * const instance = spanner.instance('my-instance');
319 11
   * const database = instance.database('my-database');
320 11
   *
321 11
   * database.createBatchTransaction(function(err, transaction) {
322 11
   *   const identifier = transaction.identifier();
323 11
   * });
324 11
   */
325 11
  identifier(): TransactionIdentifier {
326 11
    return {
327 11
      transaction: (this.id! as Buffer).toString('base64'),
328 11
      session: this.session.id,
329 11
      timestamp: this.readTimestampProto,
330 11
    };
331 11
  }
332 11
}
333 11

334 11
/*! Developer Documentation
335 11
 *
336 11
 * All async methods (except for streams) will return a Promise in the event
337 11
 * that a callback is omitted.
338 11
 */
339 11
promisifyAll(BatchTransaction, {
340 11
  exclude: ['identifier'],
341 11
});
342 11

343 11
export {BatchTransaction};

Read our documentation on viewing source code .

Loading