Skip to content

Commit b60a683

Browse files
authored
feat(spanner): add support for snapshot isolation (#2245)
This PR contains code changes to add support for option IsolationLevel at the client level and at the transaction level. supported methods are(RW and Blind Write): ``` - writeAtLeastOnce - runTransactionAsync - runTransaction - getTransaction - async getTransaction(from transaction runner class) ```
1 parent 74a1989 commit b60a683

File tree

12 files changed

+473
-40
lines changed

12 files changed

+473
-40
lines changed

observability-test/database.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ import * as pfy from '@google-cloud/promisify';
4848
import {grpc} from 'google-gax';
4949
import {MockError} from '../test/mockserver/mockspanner';
5050
import {FakeSessionFactory} from '../test/database';
51+
import {RunTransactionOptions} from '../src/transaction-runner';
5152
const {generateWithAllSpansHaveDBName} = require('./helper');
5253

5354
const fakePfy = extend({}, pfy, {
@@ -147,6 +148,7 @@ class FakeTransaction extends EventEmitter {
147148
setQueuedMutations(mutation) {
148149
this._queuedMutations = mutation;
149150
}
151+
setReadWriteTransactionOptions(options: RunTransactionOptions) {}
150152
commit(
151153
options?: CommitOptions,
152154
callback?: CommitCallback

src/database.ts

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import {
4242
google,
4343
google as spannerClient,
4444
} from '../protos/protos';
45+
import IsolationLevel = google.spanner.v1.TransactionOptions.IsolationLevel;
4546
import {
4647
CreateDatabaseCallback,
4748
CreateDatabaseOptions,
@@ -316,6 +317,10 @@ export interface RestoreOptions {
316317
gaxOptions?: CallOptions;
317318
}
318319

320+
export interface WriteAtLeastOnceOptions extends CallOptions {
321+
isolationLevel?: IsolationLevel;
322+
}
323+
319324
/**
320325
* Create a Database object to interact with a Cloud Spanner database.
321326
*
@@ -2221,12 +2226,9 @@ class Database extends common.GrpcServiceObject {
22212226
options.requestOptions
22222227
);
22232228
}
2224-
if (options.optimisticLock) {
2225-
transaction!.useOptimisticLock();
2226-
}
2227-
if (options.excludeTxnFromChangeStreams) {
2228-
transaction!.excludeTxnFromChangeStreams();
2229-
}
2229+
transaction?.setReadWriteTransactionOptions(
2230+
options as RunTransactionOptions
2231+
);
22302232

22312233
if (!err) {
22322234
span.addEvent('Using Session', {'session.id': session?.id});
@@ -3274,12 +3276,10 @@ class Database extends common.GrpcServiceObject {
32743276
}
32753277

32763278
transaction!._observabilityOptions = this._observabilityOptions;
3277-
if (options.optimisticLock) {
3278-
transaction!.useOptimisticLock();
3279-
}
3280-
if (options.excludeTxnFromChangeStreams) {
3281-
transaction!.excludeTxnFromChangeStreams();
3282-
}
3279+
3280+
transaction!.setReadWriteTransactionOptions(
3281+
options as RunTransactionOptions
3282+
);
32833283

32843284
const release = () => {
32853285
this.pool_.release(session!);
@@ -3406,12 +3406,9 @@ class Database extends common.GrpcServiceObject {
34063406
transaction.requestOptions || {},
34073407
options.requestOptions
34083408
);
3409-
if (options.optimisticLock) {
3410-
transaction.useOptimisticLock();
3411-
}
3412-
if (options.excludeTxnFromChangeStreams) {
3413-
transaction.excludeTxnFromChangeStreams();
3414-
}
3409+
transaction!.setReadWriteTransactionOptions(
3410+
options as RunTransactionOptions
3411+
);
34153412
sessionId = session?.id;
34163413
span.addEvent('Using Session', {'session.id': sessionId});
34173414
const runner = new AsyncTransactionRunner<T>(
@@ -3638,17 +3635,17 @@ class Database extends common.GrpcServiceObject {
36383635
writeAtLeastOnce(mutations: MutationSet): Promise<CommitResponse>;
36393636
writeAtLeastOnce(
36403637
mutations: MutationSet,
3641-
options: CallOptions
3638+
options: WriteAtLeastOnceOptions
36423639
): Promise<CommitResponse>;
36433640
writeAtLeastOnce(mutations: MutationSet, callback: CommitCallback): void;
36443641
writeAtLeastOnce(
36453642
mutations: MutationSet,
3646-
options: CallOptions,
3643+
options: WriteAtLeastOnceOptions,
36473644
callback: CommitCallback
36483645
): void;
36493646
writeAtLeastOnce(
36503647
mutations: MutationSet,
3651-
optionsOrCallback?: CallOptions | CommitCallback,
3648+
optionsOrCallback?: WriteAtLeastOnceOptions | CommitCallback,
36523649
callback?: CommitCallback
36533650
): void | Promise<CommitResponse> {
36543651
const cb =
@@ -3657,7 +3654,7 @@ class Database extends common.GrpcServiceObject {
36573654
: callback;
36583655
const options =
36593656
typeof optionsOrCallback === 'object' && optionsOrCallback
3660-
? (optionsOrCallback as CallOptions)
3657+
? (optionsOrCallback as WriteAtLeastOnceOptions)
36613658
: {};
36623659

36633660
return startTrace('Database.writeAtLeastOnce', this._traceConfig, span => {
@@ -3683,6 +3680,9 @@ class Database extends common.GrpcServiceObject {
36833680
span.addEvent('Using Session', {'session.id': session?.id});
36843681
this._releaseOnEnd(session!, transaction!, span);
36853682
try {
3683+
transaction!.setReadWriteTransactionOptions(
3684+
options as RunTransactionOptions
3685+
);
36863686
transaction?.setQueuedMutations(mutations.proto());
36873687
return transaction?.commit(options, (err, resp) => {
36883688
if (err) {

src/index.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ import {
6060
ClientOptions,
6161
} from 'google-gax';
6262
import {google, google as instanceAdmin} from '../protos/protos';
63+
import IsolationLevel = google.spanner.v1.TransactionOptions.IsolationLevel;
6364
import {
6465
PagedOptions,
6566
PagedResponse,
@@ -145,6 +146,7 @@ export interface SpannerOptions extends GrpcClientOptions {
145146
sslCreds?: grpc.ChannelCredentials;
146147
routeToLeaderEnabled?: boolean;
147148
directedReadOptions?: google.spanner.v1.IDirectedReadOptions | null;
149+
defaultTransactionOptions?: Pick<RunTransactionOptions, 'isolationLevel'>;
148150
observabilityOptions?: ObservabilityOptions;
149151
}
150152
export interface RequestConfig {
@@ -247,6 +249,7 @@ class Spanner extends GrpcService {
247249
commonHeaders_: {[k: string]: string};
248250
routeToLeaderEnabled = true;
249251
directedReadOptions: google.spanner.v1.IDirectedReadOptions | null;
252+
defaultTransactionOptions: RunTransactionOptions;
250253
_observabilityOptions: ObservabilityOptions | undefined;
251254

252255
/**
@@ -331,6 +334,13 @@ class Spanner extends GrpcService {
331334
: null;
332335
delete options.directedReadOptions;
333336

337+
const defaultTransactionOptions = options.defaultTransactionOptions
338+
? options.defaultTransactionOptions
339+
: {
340+
isolationLevel: IsolationLevel.ISOLATION_LEVEL_UNSPECIFIED,
341+
};
342+
delete options.defaultTransactionOptions;
343+
334344
const emulatorHost = Spanner.getSpannerEmulatorHost();
335345
if (
336346
emulatorHost &&
@@ -371,6 +381,7 @@ class Spanner extends GrpcService {
371381
this.projectIdReplaced_ = false;
372382
this.projectFormattedName_ = 'projects/' + this.projectId;
373383
this.directedReadOptions = directedReadOptions;
384+
this.defaultTransactionOptions = defaultTransactionOptions;
374385
this._observabilityOptions = options.observabilityOptions;
375386
this.commonHeaders_ = getCommonHeaders(
376387
this.projectFormattedName_,
@@ -2068,6 +2079,7 @@ export {MutationSet};
20682079
*/
20692080
import * as protos from '../protos/protos';
20702081
import IInstanceConfig = instanceAdmin.spanner.admin.instance.v1.IInstanceConfig;
2082+
import {RunTransactionOptions} from './transaction-runner';
20712083
export {v1, protos};
20722084
export default {Spanner};
20732085
export {Float32, Float, Int, Struct, Numeric, PGNumeric, SpannerDate};

src/table.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ import {
3737
setSpanError,
3838
traceConfig,
3939
} from './instrument';
40+
import {google} from '../protos/protos';
41+
import IsolationLevel = google.spanner.v1.TransactionOptions.IsolationLevel;
4042

4143
export type Key = string | string[];
4244

@@ -53,6 +55,7 @@ export type DropTableCallback = UpdateSchemaCallback;
5355
interface MutateRowsOptions extends CommitOptions {
5456
requestOptions?: Omit<IRequestOptions, 'requestTag'>;
5557
excludeTxnFromChangeStreams?: boolean;
58+
isolationLevel?: IsolationLevel;
5659
}
5760

5861
export type DeleteRowsCallback = CommitCallback;
@@ -1100,10 +1103,16 @@ class Table {
11001103
? options.excludeTxnFromChangeStreams
11011104
: false;
11021105

1106+
const isolationLevel =
1107+
'isolationLevel' in options
1108+
? options.isolationLevel
1109+
: IsolationLevel.ISOLATION_LEVEL_UNSPECIFIED;
1110+
11031111
this.database.runTransaction(
11041112
{
11051113
requestOptions: requestOptions,
11061114
excludeTxnFromChangeStreams: excludeTxnFromChangeStreams,
1115+
isolationLevel: isolationLevel,
11071116
},
11081117
(err, transaction) => {
11091118
if (err) {

src/transaction-runner.ts

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import {isSessionNotFoundError} from './session-pool';
2626
import {Database} from './database';
2727
import {google} from '../protos/protos';
2828
import IRequestOptions = google.spanner.v1.IRequestOptions;
29+
import IsolationLevel = google.spanner.v1.TransactionOptions.IsolationLevel;
2930

3031
// eslint-disable-next-line @typescript-eslint/no-var-requires
3132
const jsonProtos = require('../protos/protos.json');
@@ -46,6 +47,7 @@ export interface RunTransactionOptions {
4647
requestOptions?: Pick<IRequestOptions, 'transactionTag'>;
4748
optimisticLock?: boolean;
4849
excludeTxnFromChangeStreams?: boolean;
50+
isolationLevel?: IsolationLevel;
4951
}
5052

5153
/**
@@ -124,7 +126,10 @@ export abstract class Runner<T> {
124126
this.transaction = transaction;
125127
this.transaction.useInRunner();
126128

127-
const defaults = {timeout: 3600000};
129+
const defaults = {
130+
timeout: 3600000,
131+
isolationLevel: IsolationLevel.ISOLATION_LEVEL_UNSPECIFIED,
132+
};
128133

129134
this.options = Object.assign(defaults, options);
130135
}
@@ -202,12 +207,9 @@ export abstract class Runner<T> {
202207
const transaction = this.session.transaction(
203208
(this.session.parent as Database).queryOptions_
204209
);
205-
if (this.options.optimisticLock) {
206-
transaction.useOptimisticLock();
207-
}
208-
if (this.options.excludeTxnFromChangeStreams) {
209-
transaction.excludeTxnFromChangeStreams();
210-
}
210+
transaction!.setReadWriteTransactionOptions(
211+
this.options as RunTransactionOptions
212+
);
211213
if (this.attempts > 0) {
212214
await transaction.begin();
213215
}

src/transaction.ts

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import {
4141
getCommonHeaders,
4242
} from './common';
4343
import {google} from '../protos/protos';
44+
import IsolationLevel = google.spanner.v1.TransactionOptions.IsolationLevel;
4445
import IAny = google.protobuf.IAny;
4546
import IQueryOptions = google.spanner.v1.ExecuteSqlRequest.IQueryOptions;
4647
import IRequestOptions = google.spanner.v1.IRequestOptions;
@@ -52,6 +53,7 @@ import {
5253
setSpanError,
5354
setSpanErrorAndException,
5455
} from './instrument';
56+
import {RunTransactionOptions} from './transaction-runner';
5557

5658
export type Rows = Array<Row | Json>;
5759
const RETRY_INFO_TYPE = 'type.googleapis.com/google.rpc.retryinfo';
@@ -1822,6 +1824,7 @@ export class Transaction extends Dml {
18221824

18231825
this._queuedMutations = [];
18241826
this._options = {readWrite: options};
1827+
this._options.isolationLevel = IsolationLevel.ISOLATION_LEVEL_UNSPECIFIED;
18251828
this.requestOptions = requestOptions;
18261829
}
18271830

@@ -2700,6 +2703,28 @@ export class Transaction extends Dml {
27002703
excludeTxnFromChangeStreams(): void {
27012704
this._options.excludeTxnFromChangeStreams = true;
27022705
}
2706+
2707+
setReadWriteTransactionOptions(options: RunTransactionOptions) {
2708+
/**
2709+
* Set optimistic concurrency control for the transaction.
2710+
*/
2711+
if (options?.optimisticLock) {
2712+
this._options.readWrite!.readLockMode = ReadLockMode.OPTIMISTIC;
2713+
}
2714+
/**
2715+
* Set option excludeTxnFromChangeStreams=true to exclude read/write transactions
2716+
* from being tracked in change streams.
2717+
*/
2718+
if (options?.excludeTxnFromChangeStreams) {
2719+
this._options.excludeTxnFromChangeStreams = true;
2720+
}
2721+
/**
2722+
* Set isolation level .
2723+
*/
2724+
this._options.isolationLevel = options?.isolationLevel
2725+
? options?.isolationLevel
2726+
: this._getSpanner().defaultTransactionOptions.isolationLevel;
2727+
}
27032728
}
27042729

27052730
/*! Developer Documentation

test/database.ts

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ import {google} from '../protos/protos';
3939
import {protos} from '../src';
4040
import * as inst from '../src/instance';
4141
import RequestOptions = google.spanner.v1.RequestOptions;
42+
import IsolationLevel = google.spanner.v1.TransactionOptions.IsolationLevel;
43+
import ReadLockMode = google.spanner.v1.TransactionOptions.ReadWrite.ReadLockMode;
4244
import EncryptionType = google.spanner.admin.database.v1.RestoreDatabaseEncryptionConfig.EncryptionType;
4345
import {
4446
BatchWriteOptions,
@@ -47,6 +49,7 @@ import {
4749
MutationSet,
4850
} from '../src/transaction';
4951
import {SessionFactory} from '../src/session-factory';
52+
import {RunTransactionOptions} from '../src/transaction-runner';
5053
let promisified = false;
5154
const fakePfy = extend({}, pfy, {
5255
promisifyAll(klass, options) {
@@ -177,6 +180,7 @@ class FakeTransaction extends EventEmitter {
177180
setQueuedMutations(mutation) {
178181
this._queuedMutations = mutation;
179182
}
183+
setReadWriteTransactionOptions(options: RunTransactionOptions) {}
180184
commit(
181185
options?: CommitOptions,
182186
callback?: CommitCallback
@@ -3166,6 +3170,17 @@ describe('Database', () => {
31663170
assert.strictEqual(options, fakeOptions);
31673171
});
31683172

3173+
it('should optionally accept runner `option` isolationLevel', async () => {
3174+
const fakeOptions = {
3175+
isolationLevel: IsolationLevel.REPEATABLE_READ,
3176+
};
3177+
3178+
await database.runTransaction(fakeOptions, assert.ifError);
3179+
3180+
const options = fakeTransactionRunner.calledWith_[3];
3181+
assert.strictEqual(options, fakeOptions);
3182+
});
3183+
31693184
it('should release the session when finished', done => {
31703185
const releaseStub = (
31713186
sandbox.stub(pool, 'release') as sinon.SinonStub
@@ -3237,6 +3252,17 @@ describe('Database', () => {
32373252
assert.strictEqual(options, fakeOptions);
32383253
});
32393254

3255+
it('should optionally accept runner `option` isolationLevel', async () => {
3256+
const fakeOptions = {
3257+
isolationLevel: IsolationLevel.REPEATABLE_READ,
3258+
};
3259+
3260+
await database.runTransactionAsync(fakeOptions, assert.ifError);
3261+
3262+
const options = fakeAsyncTransactionRunner.calledWith_[3];
3263+
assert.strictEqual(options, fakeOptions);
3264+
});
3265+
32403266
it('should return the runners resolved value', async () => {
32413267
const fakeValue = {};
32423268

test/index.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import {
3838
GetInstancesOptions,
3939
} from '../src';
4040
import {CLOUD_RESOURCE_HEADER} from '../src/common';
41+
import IsolationLevel = protos.google.spanner.v1.TransactionOptions.IsolationLevel;
4142
const singer = require('./data/singer');
4243
const music = singer.examples.spanner.music;
4344

@@ -327,6 +328,20 @@ describe('Spanner', () => {
327328
assert.strictEqual(spanner.directedReadOptions, fakeDirectedReadOptions);
328329
});
329330

331+
it('should optionally accept defaultTransactionOptions', () => {
332+
const fakeDefaultTxnOptions = {
333+
defaultTransactionOptions: {
334+
isolationLevel: IsolationLevel.REPEATABLE_READ,
335+
},
336+
};
337+
338+
const spanner = new Spanner(fakeDefaultTxnOptions);
339+
assert.strictEqual(
340+
spanner.defaultTransactionOptions,
341+
fakeDefaultTxnOptions.defaultTransactionOptions
342+
);
343+
});
344+
330345
it('should set projectFormattedName_', () => {
331346
assert.strictEqual(
332347
spanner.projectFormattedName_,

0 commit comments

Comments
 (0)