Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 48 additions & 3 deletions src/cloud-sql-instance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ export class CloudSQLInstance {
private scheduledRefreshID?: ReturnType<typeof setTimeout> | null = undefined;
/* eslint-disable-next-line @typescript-eslint/no-explicit-any */
private throttle?: any;
private closed = false;
public readonly instanceInfo: InstanceConnectionInfo;
public ephemeralCert?: SslCert;
public host?: string;
Expand Down Expand Up @@ -106,17 +107,45 @@ export class CloudSQLInstance {
}) as ReturnType<typeof pThrottle>;
}

forceRefresh(): Promise<RefreshResult> {
forceRefresh(): Promise<void> {
// if a refresh is already ongoing, just await for its promise to fulfill
// so that a new instance info is available before reconnecting
if (this.next) {
return this.next;
return new Promise(resolve => {
if (this.next) {
this.next.finally(resolve);
} else {
resolve();
}
});
}

this.cancelRefresh();
return this.refresh();
this.scheduleRefresh(0);

return new Promise(resolve => {
// setTimeout() to yield execution to allow other refresh background
// task to start.
setTimeout(() => {
if (this.next) {
// If there is a refresh promise in progress, resolve this promise
// when the refresh is complete.
this.next.finally(resolve);
} else {
// Else resolve immediately.
resolve();
}
}, 0);
});
}

refresh(): Promise<RefreshResult> {
if (this.closed) {
this.scheduledRefreshID = undefined;
this.next = undefined;
return Promise.reject('closed');
}

const currentRefreshId = this.scheduledRefreshID;

// Since forceRefresh might be invoked during an ongoing refresh
Expand Down Expand Up @@ -183,6 +212,12 @@ export class CloudSQLInstance {
// used to create new connections to a Cloud SQL instance. It throws in
// case any of the internal steps fails.
private async performRefresh(): Promise<RefreshResult> {
if (this.closed) {
// The connector may be closed while the rate limiter delayed
// a call to performRefresh() so check this.closed before continuing.
return Promise.reject('closed');
}

const rsaKeys: RSAKeys = await generateKeys();
const metadata: InstanceMetadata =
await this.sqlAdminFetcher.getInstanceMetadata(this.instanceInfo);
Expand Down Expand Up @@ -244,6 +279,9 @@ export class CloudSQLInstance {
}

private scheduleRefresh(delay: number): void {
if (this.closed) {
return;
}
this.scheduledRefreshID = setTimeout(() => this.refresh(), delay);
}

Expand All @@ -260,4 +298,11 @@ export class CloudSQLInstance {
setEstablishedConnection(): void {
this.establishedConnection = true;
}

// close stops any refresh process in progress and prevents future refresh
// connections.
close(): void {
this.closed = true;
this.cancelRefresh();
}
}
6 changes: 3 additions & 3 deletions src/connector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,8 @@ export class Connector {
serverCaMode,
dnsName,
});
tlsSocket.once('error', async () => {
await cloudSqlInstance.forceRefresh();
tlsSocket.once('error', () => {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not be async. Otherwise it will block the exit of the connector until the refresh is successful. This causes some failure test cases to hang and never exit.

cloudSqlInstance.forceRefresh();
});
tlsSocket.once('secureConnect', async () => {
cloudSqlInstance.setEstablishedConnection();
Expand Down Expand Up @@ -333,7 +333,7 @@ export class Connector {
// Also clear up any local proxy servers and socket connections.
close(): void {
for (const instance of this.instances.values()) {
instance.cancelRefresh();
instance.close();
}
for (const server of this.localProxies) {
server.close();
Expand Down
49 changes: 41 additions & 8 deletions test/cloud-sql-instance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,9 @@ t.test('CloudSQLInstance', async t => {
await CloudSQLInstance.prototype.refresh.call(instance);
instance.refresh = CloudSQLInstance.prototype.refresh;
};

await instance.forceRefresh();

t.ok(
cancelRefreshCalled,
'should cancelRefresh current refresh cycle on force refresh'
Expand All @@ -290,7 +292,7 @@ t.test('CloudSQLInstance', async t => {
let cancelRefreshCalled = false;
let refreshCalled = false;

const refreshPromise = instance.refresh();
instance.refresh();

instance.cancelRefresh = () => {
cancelRefreshCalled = true;
Expand All @@ -302,13 +304,7 @@ t.test('CloudSQLInstance', async t => {
return CloudSQLInstance.prototype.refresh.call(instance);
};

const forceRefreshPromise = instance.forceRefresh();
t.strictSame(
refreshPromise,
forceRefreshPromise,
'forceRefresh should return same promise ref from initial refresh call'
);
await forceRefreshPromise;
await instance.forceRefresh();

t.ok(
!cancelRefreshCalled,
Expand Down Expand Up @@ -481,6 +477,43 @@ t.test('CloudSQLInstance', async t => {
}
);

t.test(
'close on established connection and ongoing failed cycle',
async t => {
let metadataCount = 0;
const failAndSlowFetcher = {
...fetcher,
async getInstanceMetadata() {
await (() => new Promise(res => setTimeout(res, 50)))();
metadataCount++;
return fetcher.getInstanceMetadata();
},
};

const instance = new CloudSQLInstance({
ipType: IpAddressTypes.PUBLIC,
authType: AuthTypes.PASSWORD,
instanceConnectionName: 'my-project:us-east1:my-instance',
sqlAdminFetcher: failAndSlowFetcher,
limitRateInterval: 50,
});

await instance.refresh();
instance.setEstablishedConnection();

// starts a new refresh cycle but do not await on it
instance.close();
await instance.forceRefresh();
t.equal(metadataCount, 1, 'No refresh after close');
Comment on lines +504 to +507
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this mean we do not error on a closed instance?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't throw an error from forceRefresh(). The node connector will throw an error when trying to call connect() on a closed instance, but that's coming in a later PR.


await t.rejects(
instance.refresh(),
'closed',
'Refresh after close rejected.'
);
}
);

t.test(
'get invalid certificate data while having a current valid',
async t => {
Expand Down
Loading