Skip to content

Commit 21dddb3

Browse files
js07jcortes
andauthored
feat(platform): add file-stream utility (#16952)
This commit introduces a new helper for working with file streams, providing a unified interface for reading local and remote files. * bump package minor version * update pnpm-lock.yaml * suppress linter error by allowing 'any' type for event and request body --------- Co-authored-by: Jorge Cortes <[email protected]>
1 parent 06adeff commit 21dddb3

File tree

10 files changed

+633
-24
lines changed

10 files changed

+633
-24
lines changed

platform/__tests__/file-stream.js

Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
const {
2+
getFileStream, getFileStreamAndMetadata,
3+
} = require("../dist");
4+
const fs = require("fs");
5+
const path = require("path");
6+
const http = require("http");
7+
const os = require("os");
8+
9+
// Helper function to read content from a readable stream
10+
async function readStreamContent(stream) {
11+
let content = "";
12+
stream.on("data", (chunk) => {
13+
content += chunk.toString();
14+
});
15+
16+
await new Promise((resolve) => {
17+
stream.on("end", resolve);
18+
});
19+
20+
return content;
21+
}
22+
23+
// Helper function to wait for stream cleanup by listening to close event
24+
async function waitForStreamCleanup(stream) {
25+
return new Promise((resolve) => {
26+
stream.on("close", resolve);
27+
});
28+
}
29+
30+
describe("file-stream", () => {
31+
let testFilePath;
32+
let server;
33+
const testPort = 3892;
34+
35+
beforeAll(() => {
36+
// Create a test file
37+
testFilePath = path.join(__dirname, "test-file.txt");
38+
fs.writeFileSync(testFilePath, "test content for file stream");
39+
40+
// Create a simple HTTP server for testing remote files
41+
server = http.createServer((req, res) => {
42+
if (req.url === "/test-file.txt") {
43+
res.writeHead(200, {
44+
"Content-Type": "text/plain",
45+
"Content-Length": "28",
46+
"Last-Modified": new Date().toUTCString(),
47+
"ETag": "\"test-etag\"",
48+
});
49+
res.end("test content for file stream");
50+
} else if (req.url === "/no-content-length") {
51+
res.writeHead(200, {
52+
"Content-Type": "application/json",
53+
"Last-Modified": new Date().toUTCString(),
54+
});
55+
res.end("{\"test\": \"data\"}");
56+
} else if (req.url === "/error") {
57+
res.writeHead(404, "Not Found");
58+
res.end();
59+
} else {
60+
res.writeHead(404, "Not Found");
61+
res.end();
62+
}
63+
});
64+
65+
return new Promise((resolve) =>
66+
server.listen(testPort, resolve));
67+
});
68+
69+
afterAll(() => {
70+
// Clean up test file
71+
if (fs.existsSync(testFilePath)) {
72+
fs.unlinkSync(testFilePath);
73+
}
74+
75+
if (server) {
76+
return new Promise((resolve) =>
77+
server.close(resolve));
78+
}
79+
});
80+
81+
describe("getFileStream", () => {
82+
it("should return readable stream for local file", async () => {
83+
const stream = await getFileStream(testFilePath);
84+
expect(stream).toBeDefined();
85+
expect(typeof stream.read).toBe("function");
86+
87+
const content = await readStreamContent(stream);
88+
expect(content).toBe("test content for file stream");
89+
});
90+
91+
it("should return readable stream for remote URL", async () => {
92+
const stream = await getFileStream(`http://localhost:${testPort}/test-file.txt`);
93+
expect(stream).toBeDefined();
94+
expect(typeof stream.read).toBe("function");
95+
96+
const content = await readStreamContent(stream);
97+
expect(content).toBe("test content for file stream");
98+
});
99+
100+
it("should throw error for invalid URL", async () => {
101+
await expect(getFileStream(`http://localhost:${testPort}/error`))
102+
.rejects.toThrow("Failed to fetch");
103+
});
104+
105+
it("should throw error for non-existent local file", async () => {
106+
await expect(getFileStream("/non/existent/file.txt"))
107+
.rejects.toThrow();
108+
});
109+
});
110+
111+
describe("getFileStreamAndMetadata", () => {
112+
it("should return stream and metadata for local file", async () => {
113+
const result = await getFileStreamAndMetadata(testFilePath);
114+
115+
expect(result.stream).toBeDefined();
116+
expect(typeof result.stream.read).toBe("function");
117+
expect(result.metadata).toMatchObject({
118+
size: 28,
119+
name: "test-file.txt",
120+
});
121+
expect(result.metadata.lastModified.constructor.name).toBe("Date");
122+
const content = await readStreamContent(result.stream);
123+
expect(content).toBe("test content for file stream");
124+
});
125+
126+
it("should return stream and metadata for remote file with content-length", async () => {
127+
const result = await getFileStreamAndMetadata(`http://localhost:${testPort}/test-file.txt`);
128+
129+
expect(result.stream).toBeDefined();
130+
expect(typeof result.stream.read).toBe("function");
131+
expect(result.metadata).toMatchObject({
132+
size: 28,
133+
contentType: "text/plain",
134+
name: "test-file.txt",
135+
etag: "\"test-etag\"",
136+
});
137+
expect(result.metadata.lastModified).toBeInstanceOf(Date);
138+
const content = await readStreamContent(result.stream);
139+
expect(content).toBe("test content for file stream");
140+
});
141+
142+
it("should handle remote file without content-length", async () => {
143+
const result = await getFileStreamAndMetadata(`http://localhost:${testPort}/no-content-length`);
144+
145+
expect(result.stream).toBeDefined();
146+
expect(typeof result.stream.read).toBe("function");
147+
148+
expect(result.metadata).toMatchObject({
149+
size: 16, // Size determined after download
150+
contentType: "application/json",
151+
});
152+
expect(result.metadata.lastModified).toBeInstanceOf(Date);
153+
154+
const content = await readStreamContent(result.stream);
155+
expect(content).toBe("{\"test\": \"data\"}");
156+
});
157+
158+
it("should throw error for invalid remote URL", async () => {
159+
await expect(getFileStreamAndMetadata(`http://localhost:${testPort}/error`))
160+
.rejects.toThrow("Failed to fetch");
161+
});
162+
});
163+
164+
describe("temporary file cleanup", () => {
165+
it("should clean up temporary files after stream ends", async () => {
166+
const tmpDir = os.tmpdir();
167+
const tempFilesBefore = fs.readdirSync(tmpDir);
168+
const result = await getFileStreamAndMetadata(`http://localhost:${testPort}/no-content-length`);
169+
170+
const content = await readStreamContent(result.stream);
171+
// Wait for cleanup to complete by listening to close event
172+
await waitForStreamCleanup(result.stream);
173+
174+
// Check that temp files were cleaned up
175+
const tempFilesAfter = fs.readdirSync(tmpDir);
176+
expect(tempFilesAfter.length).toEqual(tempFilesBefore.length);
177+
expect(content).toBe("{\"test\": \"data\"}");
178+
});
179+
180+
it("should clean up temporary files on stream error", async () => {
181+
// Check temp files before
182+
const tmpDir = os.tmpdir();
183+
const tempFilesBefore = fs.readdirSync(tmpDir);
184+
185+
const result = await getFileStreamAndMetadata(`http://localhost:${testPort}/no-content-length`);
186+
187+
// Trigger an error and wait for cleanup
188+
result.stream.destroy(new Error("Test error"));
189+
await waitForStreamCleanup(result.stream);
190+
191+
// Check that temp files were cleaned up
192+
const tempFilesAfter = fs.readdirSync(tmpDir);
193+
expect(tempFilesAfter.length).toEqual(tempFilesBefore.length);
194+
});
195+
});
196+
});

platform/dist/file-stream.d.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/// <reference types="node" />
2+
import { Readable } from "stream";
3+
export interface FileMetadata {
4+
size: number;
5+
contentType?: string;
6+
lastModified?: Date;
7+
name?: string;
8+
etag?: string;
9+
}
10+
/**
11+
* @param pathOrUrl - a file path or a URL
12+
* @returns a Readable stream of the file content
13+
*/
14+
export declare function getFileStream(pathOrUrl: string): Promise<Readable>;
15+
/**
16+
* @param pathOrUrl - a file path or a URL
17+
* @returns a Readable stream of the file content and its metadata
18+
*/
19+
export declare function getFileStreamAndMetadata(pathOrUrl: string): Promise<{
20+
stream: Readable;
21+
metadata: FileMetadata;
22+
}>;

platform/dist/file-stream.js

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
"use strict";
2+
Object.defineProperty(exports, "__esModule", { value: true });
3+
exports.getFileStreamAndMetadata = exports.getFileStream = void 0;
4+
const stream_1 = require("stream");
5+
const fs_1 = require("fs");
6+
const os_1 = require("os");
7+
const path_1 = require("path");
8+
const promises_1 = require("stream/promises");
9+
const uuid_1 = require("uuid");
10+
const mime = require("mime-types");
11+
/**
12+
* @param pathOrUrl - a file path or a URL
13+
* @returns a Readable stream of the file content
14+
*/
15+
async function getFileStream(pathOrUrl) {
16+
if (isUrl(pathOrUrl)) {
17+
const response = await fetch(pathOrUrl);
18+
if (!response.ok || !response.body) {
19+
throw new Error(`Failed to fetch ${pathOrUrl}: ${response.status} ${response.statusText}`);
20+
}
21+
return stream_1.Readable.fromWeb(response.body);
22+
}
23+
else {
24+
await safeStat(pathOrUrl);
25+
return fs_1.createReadStream(pathOrUrl);
26+
}
27+
}
28+
exports.getFileStream = getFileStream;
29+
/**
30+
* @param pathOrUrl - a file path or a URL
31+
* @returns a Readable stream of the file content and its metadata
32+
*/
33+
async function getFileStreamAndMetadata(pathOrUrl) {
34+
if (isUrl(pathOrUrl)) {
35+
return await getRemoteFileStreamAndMetadata(pathOrUrl);
36+
}
37+
else {
38+
return await getLocalFileStreamAndMetadata(pathOrUrl);
39+
}
40+
}
41+
exports.getFileStreamAndMetadata = getFileStreamAndMetadata;
42+
function isUrl(pathOrUrl) {
43+
try {
44+
new URL(pathOrUrl);
45+
return true;
46+
}
47+
catch (_a) {
48+
return false;
49+
}
50+
}
51+
async function safeStat(path) {
52+
try {
53+
return await fs_1.promises.stat(path);
54+
}
55+
catch (_a) {
56+
throw new Error(`File not found: ${path}`);
57+
}
58+
}
59+
async function getLocalFileStreamAndMetadata(filePath) {
60+
const stats = await safeStat(filePath);
61+
const contentType = mime.lookup(filePath) || undefined;
62+
const metadata = {
63+
size: stats.size,
64+
lastModified: stats.mtime,
65+
name: path_1.basename(filePath),
66+
contentType,
67+
};
68+
const stream = fs_1.createReadStream(filePath);
69+
return {
70+
stream,
71+
metadata,
72+
};
73+
}
74+
async function getRemoteFileStreamAndMetadata(url) {
75+
const response = await fetch(url);
76+
if (!response.ok || !response.body) {
77+
throw new Error(`Failed to fetch ${url}: ${response.status} ${response.statusText}`);
78+
}
79+
const headers = response.headers;
80+
const contentLength = headers.get("content-length");
81+
const lastModified = headers.get("last-modified")
82+
? new Date(headers.get("last-modified"))
83+
: undefined;
84+
const etag = headers.get("etag") || undefined;
85+
const urlObj = new URL(url);
86+
const name = path_1.basename(urlObj.pathname);
87+
const contentType = headers.get("content-type") || mime.lookup(urlObj.pathname) || undefined;
88+
const baseMetadata = {
89+
contentType,
90+
lastModified,
91+
name,
92+
etag,
93+
};
94+
// If we have content-length, we can stream directly
95+
if (contentLength) {
96+
const metadata = {
97+
...baseMetadata,
98+
size: parseInt(contentLength, 10),
99+
};
100+
const stream = stream_1.Readable.fromWeb(response.body);
101+
return {
102+
stream,
103+
metadata,
104+
};
105+
}
106+
// No content-length header - need to download to temporary file to get size
107+
return await downloadToTemporaryFile(response, baseMetadata);
108+
}
109+
async function downloadToTemporaryFile(response, baseMetadata) {
110+
// Generate unique temporary file path
111+
const tempFileName = `file-stream-${uuid_1.v4()}`;
112+
const tempFilePath = path_1.join(os_1.tmpdir(), tempFileName);
113+
// Download to temporary file
114+
const fileStream = fs_1.createWriteStream(tempFilePath);
115+
const webStream = stream_1.Readable.fromWeb(response.body);
116+
try {
117+
await promises_1.pipeline(webStream, fileStream);
118+
const stats = await fs_1.promises.stat(tempFilePath);
119+
const metadata = {
120+
...baseMetadata,
121+
size: stats.size,
122+
};
123+
const stream = fs_1.createReadStream(tempFilePath);
124+
const cleanup = async () => {
125+
try {
126+
await fs_1.promises.unlink(tempFilePath);
127+
}
128+
catch (_a) {
129+
// Ignore cleanup errors
130+
}
131+
};
132+
stream.once("close", cleanup);
133+
stream.once("end", cleanup);
134+
stream.once("error", cleanup);
135+
return {
136+
stream,
137+
metadata,
138+
};
139+
}
140+
catch (err) {
141+
// Cleanup on error
142+
try {
143+
await fs_1.promises.unlink(tempFilePath);
144+
}
145+
catch (_a) {
146+
// Ignore cleanup errors
147+
}
148+
throw err;
149+
}
150+
}

platform/dist/index.d.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ import axios, { transformConfigForOauth } from "./axios";
33
import { AxiosRequestConfig as AxiosConfig } from "axios";
44
export { axios, transformConfigForOauth, };
55
export { cloneSafe, jsonStringifySafe, } from "./utils";
6+
export { getFileStreamAndMetadata, getFileStream, } from "./file-stream";
7+
export type { FileMetadata, } from "./file-stream";
68
export { ConfigurationError, } from "./errors";
79
export { default as sqlProp, } from "./sql-prop";
810
export type { ColumnSchema, DbInfo, TableInfo, TableMetadata, TableSchema, } from "./sql-prop";

platform/dist/index.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ Object.defineProperty(exports, "transformConfigForOauth", { enumerable: true, ge
88
var utils_1 = require("./utils");
99
Object.defineProperty(exports, "cloneSafe", { enumerable: true, get: function () { return utils_1.cloneSafe; } });
1010
Object.defineProperty(exports, "jsonStringifySafe", { enumerable: true, get: function () { return utils_1.jsonStringifySafe; } });
11+
var file_stream_1 = require("./file-stream");
12+
Object.defineProperty(exports, "getFileStreamAndMetadata", { enumerable: true, get: function () { return file_stream_1.getFileStreamAndMetadata; } });
13+
Object.defineProperty(exports, "getFileStream", { enumerable: true, get: function () { return file_stream_1.getFileStream; } });
1114
var errors_1 = require("./errors");
1215
Object.defineProperty(exports, "ConfigurationError", { enumerable: true, get: function () { return errors_1.ConfigurationError; } });
1316
var sql_prop_1 = require("./sql-prop");

0 commit comments

Comments
 (0)