Skip to content

feat: track precommit token in r/w apis(multiplexed session) #2312

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jun 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions src/session-factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,14 @@ export interface SessionFactoryInterface {
*/
getSessionForPartitionedOps(callback: GetSessionCallback): void;

/**
* When called returns a session for read write.
*
* @name SessionFactoryInterface#getSessionForReadWrite
* @param {GetSessionCallback} callback The callback function.
*/
getSessionForReadWrite(callback: GetSessionCallback): void;

/**
* When called returns the pool object.
*
Expand Down Expand Up @@ -106,6 +114,7 @@ export class SessionFactory
pool_: SessionPoolInterface;
isMultiplexed: boolean;
isMultiplexedPartitionOps: boolean;
isMultiplexedRW: boolean;
constructor(
database: Database,
name: String,
Expand All @@ -131,6 +140,10 @@ export class SessionFactory
process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS === 'true' &&
process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_PARTITIONED_OPS ===
'true';

this.isMultiplexedRW =
process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS === 'true' &&
process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_FOR_RW === 'true';
// Multiplexed sessions should only be created if its enabled.
if (this.isMultiplexed) {
this.multiplexedSession_.on('error', this.emit.bind(database, 'error'));
Expand Down Expand Up @@ -174,6 +187,23 @@ export class SessionFactory
: this.pool_.getSession(callback);
}

/**
* Retrieves a session for read write operations, selecting the appropriate session type
* based on whether multiplexed sessions are enabled.
*
* If multiplexed sessions are enabled for read write this methods delegates the request to `getSession()`, which returns
* either a multiplexed session or a regular session based on the configuration.
*
* If the multiplexed sessions are disabled, a session is retrieved from the regular session pool.
*
* @param {GetSessionCallback} callback The callback function.
*/
getSessionForReadWrite(callback: GetSessionCallback): void {
this.isMultiplexedRW
? this.getSession(callback)
: this.pool_.getSession(callback);
}

/**
* Returns the regular session pool object.
*
Expand Down
33 changes: 32 additions & 1 deletion src/transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,11 @@ export interface RunUpdateCallback {
export type CommitCallback =
NormalCallback<spannerClient.spanner.v1.ICommitResponse>;

type PrecommitTokenProvider =
| spannerClient.spanner.v1.ITransaction
| spannerClient.spanner.v1.IPartialResultSet
| spannerClient.spanner.v1.IExecuteBatchDmlResponse;

/**
* @typedef {object} TimestampBounds
* @property {boolean} [strong=true] Read at a timestamp where all previously
Expand Down Expand Up @@ -286,6 +291,10 @@ export class Snapshot extends EventEmitter {
protected _waitingRequests: Array<() => void>;
protected _inlineBeginStarted;
protected _useInRunner = false;
protected _latestPreCommitToken:
| spannerClient.spanner.v1.IMultiplexedSessionPrecommitToken
| undefined
| null;
id?: Uint8Array | string;
ended: boolean;
metadata?: spannerClient.spanner.v1.ITransaction;
Expand Down Expand Up @@ -371,6 +380,17 @@ export class Snapshot extends EventEmitter {
opts: this._observabilityOptions,
dbName: this._dbName,
};
this._latestPreCommitToken = null;
}

protected _updatePrecommitToken(resp: PrecommitTokenProvider): void {
if (
this._latestPreCommitToken === null ||
this._latestPreCommitToken === undefined ||
this._latestPreCommitToken!.seqNum! < resp.precommitToken!.seqNum!
) {
this._latestPreCommitToken = resp.precommitToken;
}
}

/**
Expand Down Expand Up @@ -477,6 +497,7 @@ export class Snapshot extends EventEmitter {
if (err) {
setSpanError(span, err);
} else {
this._updatePrecommitToken(resp);
this._update(resp);
}
span.end();
Expand Down Expand Up @@ -778,6 +799,7 @@ export class Snapshot extends EventEmitter {
},
)
?.on('response', response => {
this._updatePrecommitToken(response);
if (response.metadata && response.metadata!.transaction && !this.id) {
this._update(response.metadata!.transaction);
}
Expand Down Expand Up @@ -1381,6 +1403,7 @@ export class Snapshot extends EventEmitter {
},
)
.on('response', response => {
this._updatePrecommitToken(response);
if (response.metadata && response.metadata!.transaction && !this.id) {
this._update(response.metadata!.transaction);
}
Expand Down Expand Up @@ -2040,6 +2063,8 @@ export class Transaction extends Dml {
return;
}

this._updatePrecommitToken(resp);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove blank line

const {resultSets, status} = resp;
for (const resultSet of resultSets) {
if (!this.id && resultSet.metadata?.transaction) {
Expand Down Expand Up @@ -2182,8 +2207,14 @@ export class Transaction extends Dml {

const mutations = this._queuedMutations;
const session = this.session.formattedName_!;
const precommitToken = this._latestPreCommitToken;
const requestOptions = (options as CommitOptions).requestOptions;
const reqOpts: CommitRequest = {mutations, session, requestOptions};
const reqOpts: CommitRequest = {
mutations,
session,
requestOptions,
precommitToken,
};

return startTrace(
'Transaction.commit',
Expand Down
94 changes: 94 additions & 0 deletions test/session-factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,30 @@ describe('SessionFactory', () => {
assert.strictEqual(sessionFactory.isMultiplexed, true);
});
});

describe('when GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS and GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_FOR_RW both are disabled', () => {
before(() => {
process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS = 'false';
process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_FOR_RW = 'false';
});

it('should correctly initialize the isMultiplexedRW field', () => {
const sessionFactory = new SessionFactory(DATABASE, NAME, POOL_OPTIONS);
assert.strictEqual(sessionFactory.isMultiplexedRW, false);
});
});

describe('when GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS and GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_FOR_RW both are enabled', () => {
before(() => {
process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS = 'true';
process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_FOR_RW = 'true';
});

it('should correctly initialize the isMultiplexedRW field', () => {
const sessionFactory = new SessionFactory(DATABASE, NAME, POOL_OPTIONS);
assert.strictEqual(sessionFactory.isMultiplexedRW, true);
});
});
});

describe('getSession', () => {
Expand Down Expand Up @@ -222,6 +246,76 @@ describe('SessionFactory', () => {
});
});

describe('getSessionForReadWrite', () => {
describe('when multiplexed session for r/w disabled', () => {
before(() => {
process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS = 'false';
process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_FOR_RW = 'false';
});

it('should retrieve a regular session from the pool', done => {
(
sandbox.stub(sessionFactory.pool_, 'getSession') as sinon.SinonStub
).callsFake(callback => callback(null, fakeSession));
sessionFactory.getSessionForReadWrite((err, resp) => {
assert.strictEqual(err, null);
assert.strictEqual(resp, fakeSession);
done();
});
});

it('should propagate errors when regular session retrieval fails', done => {
const fakeError = new Error();
(
sandbox.stub(sessionFactory.pool_, 'getSession') as sinon.SinonStub
).callsFake(callback => callback(fakeError, null));
sessionFactory.getSessionForReadWrite((err, resp) => {
assert.strictEqual(err, fakeError);
assert.strictEqual(resp, null);
done();
});
});
});

describe('when multiplexed session for r/w enabled', () => {
before(() => {
process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS = 'true';
process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_FOR_RW = 'true';
});

it('should return the multiplexed session', done => {
(
sandbox.stub(
sessionFactory.multiplexedSession_,
'getSession',
) as sinon.SinonStub
).callsFake(callback => callback(null, fakeMuxSession));
sessionFactory.getSessionForReadWrite((err, resp) => {
assert.strictEqual(err, null);
assert.strictEqual(resp, fakeMuxSession);
assert.strictEqual(resp?.metadata.multiplexed, true);
assert.strictEqual(fakeMuxSession.metadata.multiplexed, true);
done();
});
});

it('should propagate error when multiplexed session return fails', done => {
const fakeError = new Error();
(
sandbox.stub(
sessionFactory.multiplexedSession_,
'getSession',
) as sinon.SinonStub
).callsFake(callback => callback(fakeError, null));
sessionFactory.getSessionForReadWrite((err, resp) => {
assert.strictEqual(err, fakeError);
assert.strictEqual(resp, null);
done();
});
});
});
});

describe('getPool', () => {
it('should return the session pool object', () => {
const pool = sessionFactory.getPool();
Expand Down
Loading
Loading