Skip to content

SNOW-2041980 Kerberos Proxy Auth - NodeJS #1102

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 26 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import { WIP_ConnectionConfig } from './lib/connection/types';
* The snowflake-sdk module provides an instance to connect to the Snowflake server
* @see [source] {@link https://docs.snowflake.com/en/developer-guide/node-js/nodejs-driver}
*/
import { HttpHeadersCustomizer } from './lib/connection/types';

declare module 'snowflake-sdk' {

export const ErrorCode: typeof import('./lib/error_code').default;
Expand Down Expand Up @@ -343,6 +345,12 @@ declare module 'snowflake-sdk' {
* The option to pass passcode from DUO.
*/
passcode?: string;

/**
* Customizes the HTTP headers sent with each request.
* The customizer functions are called with the HTTP method and URL.
*/
httpHeadersCustomizer?: Array<HttpHeadersCustomizer>;
}

export interface Connection {
Expand Down
14 changes: 14 additions & 0 deletions lib/connection/connection_config.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ const os = require('os');
const url = require('url');
const Util = require('../util');
const ProxyUtil = require('../proxy_util');
const { isValidHTTPHeadersCustomizer } = require('../http/request_util');
const Errors = require('../errors');
const ConnectionConstants = require('../constants/connection_constants');
const path = require('path');
Expand Down Expand Up @@ -74,6 +75,7 @@ const DEFAULT_PARAMS =
'oauthScope',
'oauthChallengeMethod',
'oauthHttpAllowed', //only for tests
'httpHeadersCustomizer',
'workloadIdentityProvider',
'workloadIdentityAzureEntraIdResource'
];
Expand Down Expand Up @@ -591,6 +593,14 @@ function ConnectionConfig(options, validateCredentials, qaMode, clientInfo) {
passcode = options.passcode;
}

let httpHeadersCustomizer = [];
if (Util.exists(options.httpHeadersCustomizer)) {
Errors.checkArgumentValid(isValidHTTPHeadersCustomizer(options.httpHeadersCustomizer),
ErrorCodes.ERR_CONN_CREATE_INVALID_HTTP_HEADER_CUSTOMIZERS);

httpHeadersCustomizer = options.httpHeadersCustomizer;
}

if (validateDefaultParameters) {
for (const [key] of Object.entries(options)) {
if (!DEFAULT_PARAMS.includes(key)) {
Expand Down Expand Up @@ -964,6 +974,10 @@ function ConnectionConfig(options, validateCredentials, qaMode, clientInfo) {
return oauthHttpAllowed || false;
};

this.getHttpHeadersCustomizer = function () {
return httpHeadersCustomizer;
};

/**
* Returns attributes of Connection Config object that can be used to identify
* the connection, when ID is not available in the scope. This is not sufficient set,
Expand Down
21 changes: 12 additions & 9 deletions lib/connection/statement.js
Original file line number Diff line number Diff line change
Expand Up @@ -1473,15 +1473,17 @@
const sendRequest = function () {
// if this is a retry and a query parameter should be appended to the url on
// retry, update the url
if ((numRetries > 0) && appendQueryParamOnRetry) {
const retryOption = {
url: urlOrig,
retryCount: numRetries,
retryReason: lastStatusCodeForRetry,
includeRetryReason: connectionConfig.getIncludeRetryReason(),
};

options.url = Util.url.appendRetryParam(retryOption);
if (numRetries > 0) {
options.isRetry = true;
if (appendQueryParamOnRetry) {
const retryOption = {

Check warning on line 1479 in lib/connection/statement.js

View check run for this annotation

Codecov / codecov/patch

lib/connection/statement.js#L1477-L1479

Added lines #L1477 - L1479 were not covered by tests
url: urlOrig,
retryCount: numRetries,
retryReason: lastStatusCodeForRetry,
includeRetryReason: connectionConfig.getIncludeRetryReason(),
};
options.url = Util.url.appendRetryParam(retryOption);

Check warning on line 1485 in lib/connection/statement.js

View check run for this annotation

Codecov / codecov/patch

lib/connection/statement.js#L1485

Added line #L1485 was not covered by tests
}
}

sf.request(options);
Expand All @@ -1497,6 +1499,7 @@
)) {
// increment the retry count
numRetries++;
options.isRetry = true;

Check warning on line 1502 in lib/connection/statement.js

View check run for this annotation

Codecov / codecov/patch

lib/connection/statement.js#L1502

Added line #L1502 was not covered by tests
lastStatusCodeForRetry = err.response ? err.response.statusCode : 0;

// use exponential backoff with decorrelated jitter to compute the
Expand Down
5 changes: 5 additions & 0 deletions lib/connection/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,8 @@ export interface WIP_ConnectionConfig {
*/
workloadIdentityAzureEntraIdResource?: string;
}

export interface HttpHeadersCustomizer {
applies(method: string, url: string): boolean;
newHeaders() : Record<string, any>;
}
1 change: 1 addition & 0 deletions lib/constants/error_messages.js
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ exports[404059] = 'Invalid oauth client id. The specified value must not be an e
exports[404060] = 'Invalid oauth client secret. The specified value must not be an empty string';
exports[404061] = 'Invalid oauth token request URL. The specified value must be a valid URL starting with the https or http protocol.';
exports[404062] = 'No workload identity credentials were found. Provider: %s';
exports[404063] = 'Invalid Http headers customizer. The specified value must contain the following functions: \'applices\', \'newHeaders\' and \'invokeOnce\'';

// 405001
exports[405001] = 'Invalid callback. The specified value must be a function.';
Expand Down
1 change: 1 addition & 0 deletions lib/error_code.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ enum ErrorCode {
ERR_CONN_CREATE_INVALID_OUATH_CLIENT_SECRET = 404060,
ERR_CONN_CREATE_INVALID_OUATH_TOKEN_REQUEST_URL = 404061,
ERR_CONN_CREATE_MISSING_WORKLOAD_IDENTITY_CREDENTIALS = 404062,
ERR_CONN_CREATE_INVALID_HTTP_HEADER_CUSTOMIZERS = 404063,

// 405001
ERR_CONN_CONNECT_INVALID_CALLBACK = 405001,
Expand Down
29 changes: 21 additions & 8 deletions lib/file_transfer_agent/gcs_util.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
const getProxyAgent = require('../http/node').getProxyAgent;
const ProxyUtil = require('../proxy_util');
const Util = require('../util');
const RequestUtil = require('../http/request_util');
const { shouldPerformGCPBucket, lstrip } = require('../util');

const GCS_METADATA_PREFIX = 'x-goog-meta-';
Expand Down Expand Up @@ -144,7 +145,9 @@
let matDescKey;

try {
if (shouldPerformGCPBucket(accessToken) && !isProxyEnabled) {
const customHeaders = RequestUtil.getCustomHeaders(connectionConfig, 'HEAD', url);
RequestUtil.cleanOverridingHeaders(gcsHeaders, customHeaders);
if (shouldPerformGCPBucket(accessToken) && !isProxyEnabled && Object.keys(customHeaders).length === 0) {
const gcsLocation = this.extractBucketNameAndPath(meta['stageInfo']['location']);
const metadata = await meta['client'].gcsClient
.bucket(gcsLocation.bucketName)
Expand All @@ -156,7 +159,7 @@
encryptionDataProp = metadata[0].metadata[ENCRYPTIONDATAPROP];
matDescKey = metadata[0].metadata[MATDESC_KEY];
} else {
const response = await axios.head(url, { headers: gcsHeaders });
const response = await axios.head(url, { headers: meta.isRetry ? gcsHeaders : { ...gcsHeaders, ...customHeaders } });

digest = response.headers[GCS_METADATA_SFC_DIGEST];
contentLength = response.headers['content-length'];
Expand Down Expand Up @@ -188,6 +191,7 @@
if ([403, 408, 429, 500, 503].includes(errCode)) {
meta['lastError'] = err;
meta['resultStatus'] = resultStatus.NEED_RETRY;
meta.isRetry = true;
return;
}
if (errCode === 404) {
Expand Down Expand Up @@ -283,7 +287,10 @@
}

try {
if (shouldPerformGCPBucket(accessToken) && !isProxyEnabled) {
const customHeaders = RequestUtil.getCustomHeaders(connectionConfig, 'PUT', uploadUrl);
RequestUtil.cleanOverridingHeaders(gcsHeaders, customHeaders);

if (shouldPerformGCPBucket(accessToken) && !isProxyEnabled && Object.keys(customHeaders).length === 0) {
const gcsLocation = this.extractBucketNameAndPath(meta['stageInfo']['location']);

await meta['client'].gcsClient
Expand All @@ -295,18 +302,19 @@
metadata: {
[ENCRYPTIONDATAPROP]: gcsHeaders[GCS_METADATA_ENCRYPTIONDATAPROP],
[MATDESC_KEY]: gcsHeaders[GCS_METADATA_MATDESC_KEY],
[SFC_DIGEST]: gcsHeaders[GCS_METADATA_SFC_DIGEST]
[SFC_DIGEST]: gcsHeaders[GCS_METADATA_SFC_DIGEST],
}
}
});
} else {
// Set maxBodyLength to allow large file uploading
await axios.put(uploadUrl, fileStream, { maxBodyLength: Infinity, headers: gcsHeaders });
await axios.put(uploadUrl, fileStream, { maxBodyLength: Infinity, headers: meta.isRetry ? gcsHeaders : { ...gcsHeaders, ...customHeaders } });
}
} catch (err) {
if ([403, 408, 429, 500, 503].includes(err['code'])) {
meta['lastError'] = err;
meta['resultStatus'] = resultStatus.NEED_RETRY;
meta['resultStatus'] = resultStatus.NEED_RETRY,
meta.isRetry = true;
} else if (!accessToken && err['code'] === 400 &&
(!meta['lastError'] || meta['lastError']['code'] !== 400)) {
// Only attempt to renew urls if this isn't the second time this happens
Expand Down Expand Up @@ -356,7 +364,10 @@
let size;

try {
if (shouldPerformGCPBucket(accessToken) && !isProxyEnabled) {
const customHeaders = RequestUtil.getCustomHeaders(connectionConfig, 'GET', downloadUrl);
RequestUtil.cleanOverridingHeaders(gcsHeaders, customHeaders);

if (shouldPerformGCPBucket(accessToken) && !isProxyEnabled && Object.keys(customHeaders).length === 0) {
const gcsLocation = this.extractBucketNameAndPath(meta['stageInfo']['location']);
await meta['client'].gcsClient
.bucket(gcsLocation.bucketName)
Expand All @@ -377,7 +388,7 @@
} else {
let response;
await axios.get(downloadUrl, {
headers: gcsHeaders,
headers: meta.isRetry ? gcsHeaders : { ...gcsHeaders, ...customHeaders },
responseType: 'stream'
}).then(async (res) => {
response = res;
Expand Down Expand Up @@ -406,8 +417,10 @@
meta['lastError'] = err;
if (err['code'] === ERRORNO_WSAECONNABORTED) {
meta['resultStatus'] = resultStatus.NEED_RETRY_WITH_LOWER_CONCURRENCY;
meta.isRetry = true;

Check warning on line 420 in lib/file_transfer_agent/gcs_util.js

View check run for this annotation

Codecov / codecov/patch

lib/file_transfer_agent/gcs_util.js#L420

Added line #L420 was not covered by tests
} else {
meta['resultStatus'] = resultStatus.NEED_RETRY;
meta.isRetry = true;

Check warning on line 423 in lib/file_transfer_agent/gcs_util.js

View check run for this annotation

Codecov / codecov/patch

lib/file_transfer_agent/gcs_util.js#L423

Added line #L423 was not covered by tests
}
}
return;
Expand Down
7 changes: 6 additions & 1 deletion lib/file_transfer_agent/remote_storage_util.js
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@
} else {
dataFile = meta['realSrcFilePath'];
}

meta.isRetry = false;

const utilClass = this.getForStorageType(meta['stageInfo']['locationType']);

let maxConcurrency = meta['parallel'];
Expand Down Expand Up @@ -198,6 +199,7 @@
const sleepingTime = Math.min(Math.pow(2, retry), 16);
await new Promise(resolve => setTimeout(resolve, sleepingTime));
}
meta.isRetry = true;

Check warning on line 202 in lib/file_transfer_agent/remote_storage_util.js

View check run for this annotation

Codecov / codecov/patch

lib/file_transfer_agent/remote_storage_util.js#L202

Added line #L202 was not covered by tests
} else if (meta['resultStatus'] === resultStatus.NEED_RETRY_WITH_LOWER_CONCURRENCY) {
lastErr = meta['lastError'];
// Failed to upload file, retrying with max concurrency
Expand All @@ -209,6 +211,7 @@
const sleepingTime = Math.min(Math.pow(2, retry), 16);
await new Promise(resolve => setTimeout(resolve, sleepingTime));
}
meta.isRetry = true;

Check warning on line 214 in lib/file_transfer_agent/remote_storage_util.js

View check run for this annotation

Codecov / codecov/patch

lib/file_transfer_agent/remote_storage_util.js#L214

Added line #L214 was not covered by tests
}
}
if (lastErr) {
Expand Down Expand Up @@ -377,6 +380,7 @@
const sleepingTime = Math.min(Math.pow(2, retry), 16);
await new Promise(resolve => setTimeout(resolve, sleepingTime));
}
meta.isRetry = true;

Check warning on line 383 in lib/file_transfer_agent/remote_storage_util.js

View check run for this annotation

Codecov / codecov/patch

lib/file_transfer_agent/remote_storage_util.js#L383

Added line #L383 was not covered by tests
} else if (meta['resultStatus'] === resultStatus.NEED_RETRY) {
lastErr = meta['lastError'];
// Failed to download file, retrying
Expand All @@ -385,6 +389,7 @@
await new Promise(resolve => setTimeout(resolve, sleepingTime));
}
}
meta.isRetry = true;

Check warning on line 392 in lib/file_transfer_agent/remote_storage_util.js

View check run for this annotation

Codecov / codecov/patch

lib/file_transfer_agent/remote_storage_util.js#L392

Added line #L392 was not covered by tests
}
if (lastErr) {
throw new Error(lastErr);
Expand Down
24 changes: 24 additions & 0 deletions lib/file_transfer_agent/s3_util.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
const expandTilde = require('expand-tilde');
const getProxyAgent = require('../http/node').getProxyAgent;
const ProxyUtil = require('../proxy_util');
const RequestUtil = require('../http/request_util');

const AMZ_IV = 'x-amz-iv';
const AMZ_KEY = 'x-amz-key';
Expand Down Expand Up @@ -103,6 +104,9 @@
Key: s3location.s3path + filename
};

if (!meta.isRetry) {
this.addCustomHeaders(params, 'GET', client?.config?.endPoint || SNOWFLAKE_S3_DESTINATION);
}
let akey;

try {
Expand Down Expand Up @@ -187,6 +191,10 @@
Metadata: s3Metadata
};

if (meta.isRetry) {
this.addCustomHeaders(params, 'PUT', client?.config?.endPoint || SNOWFLAKE_S3_DESTINATION);
}

// call S3 to upload file to specified bucket
try {
await client.putObject(params);
Expand All @@ -197,8 +205,10 @@
meta['lastError'] = err;
if (err['Code'] === ERRORNO_WSAECONNABORTED.toString()) {
meta['resultStatus'] = resultStatus.NEED_RETRY_WITH_LOWER_CONCURRENCY;
meta.isRetry = true;
} else {
meta['resultStatus'] = resultStatus.NEED_RETRY;
meta.isRetry = true;
}
}
return;
Expand Down Expand Up @@ -226,6 +236,9 @@
Key: s3location.s3path + meta['dstFileName'],
};

if (meta.isRetry) {
this.addCustomHeaders(params, 'GET', client?.config?.endPoint || SNOWFLAKE_S3_DESTINATION);

Check warning on line 240 in lib/file_transfer_agent/s3_util.js

View check run for this annotation

Codecov / codecov/patch

lib/file_transfer_agent/s3_util.js#L240

Added line #L240 was not covered by tests
}
// call S3 to download file to specified bucket
try {
await client.getObject(params)
Expand All @@ -247,14 +260,23 @@
meta['lastError'] = err;
if (err['Code'] === ERRORNO_WSAECONNABORTED.toString()) {
meta['resultStatus'] = resultStatus.NEED_RETRY_WITH_LOWER_CONCURRENCY;
meta.isRetry = true;

Check warning on line 263 in lib/file_transfer_agent/s3_util.js

View check run for this annotation

Codecov / codecov/patch

lib/file_transfer_agent/s3_util.js#L263

Added line #L263 was not covered by tests
} else {
meta['resultStatus'] = resultStatus.NEED_RETRY;
meta.isRetry = true;

Check warning on line 266 in lib/file_transfer_agent/s3_util.js

View check run for this annotation

Codecov / codecov/patch

lib/file_transfer_agent/s3_util.js#L266

Added line #L266 was not covered by tests
}
}
return;
}
meta['resultStatus'] = resultStatus.DOWNLOADED;
};

this.addCustomHeaders = function (params, method, endPoint) {
const customHeaders = RequestUtil.getCustomHeaders(connectionConfig, method, endPoint);
if (Object.keys(customHeaders).length > 0) {
params['Metadata'] ? params['Metadata'] = { ...params['Metadata'], ...customHeaders } : params['Metadata'] = customHeaders;

Check warning on line 277 in lib/file_transfer_agent/s3_util.js

View check run for this annotation

Codecov / codecov/patch

lib/file_transfer_agent/s3_util.js#L277

Added line #L277 was not covered by tests
}
};
}

/**
Expand Down Expand Up @@ -285,4 +307,6 @@
return S3Location(bucketName, s3path);
}



module.exports = { S3Util, SNOWFLAKE_S3_DESTINATION, DATA_SIZE_THRESHOLD, extractBucketNameAndPath };
23 changes: 20 additions & 3 deletions lib/http/base.js
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,6 @@ HttpClient.prototype.getAgent = function () {
return null;
};

module.exports = HttpClient;

function sanitizeAxiosResponse(response) {
Logger.getInstance().trace('Request%s - sanitizing response data.', requestUtil.describeRequestFromResponse(response));
Expand All @@ -238,7 +237,17 @@ function sanitizeAxiosError(error) {

function prepareRequestOptions(options, requestHandlers = {}) {
Logger.getInstance().trace('Request%s - constructing options.', requestUtil.describeRequestFromOptions(options));
const headers = normalizeHeaders(options.headers) || {};
let headers = normalizeHeaders(options.headers) || {};
if (!options.isRetry) {
//Save the original headers.
requestUtil.setOriginalHeader(headers);
const customHeaders = requestUtil.getCustomHeaders(this._connectionConfig, options.method, options.url);
requestUtil.cleanOverridingHeaders(headers, customHeaders);
Object.assign(headers, customHeaders);
} else {
Logger.getInstance().debug(`Customizer should only run on the first attempt and this is a ${options.retry} retry. Skipping.`);
headers = requestUtil.getOriginalHeader();
}

const timeout = options.timeout ||
this._connectionConfig.getTimeout() ||
Expand All @@ -261,7 +270,6 @@ function prepareRequestOptions(options, requestHandlers = {}) {
}
});
}

const params = options.params;

let mock;
Expand Down Expand Up @@ -371,3 +379,12 @@ function normalizeResponse(response) {

return response;
}

//Testing purposes only
function getHttpRequestHeaders(connectionConfig, options) {
const httpClient = new HttpClient(connectionConfig);
httpClient.constructExponentialBackoffStrategy = () => 0;
return prepareRequestOptions.call(httpClient, options).headers;
}

module.exports = { HttpClient, getHttpRequestHeaders };
Loading
Loading