Skip to content

Support streaming in final commit call #286

Open
@coyotte508

Description

@coyotte508

Needs first better browser support, & http2 support on hf.co's side

Some code snippets:

	// https://github.com/whatwg/fetch/issues/1438#issuecomment-1228635501
	function supportRequestStream(): boolean {
		if (!ENABLE_REQUEST_STREAM) {
			return false;
		}
		// This works because the browser adds a Content-Type header of text/plain;charset=UTF-8 to the request
		// if the body is text.
		// The browser only treats the body as text if it doesn't support request streams,
		// otherwise it won't add a Content-Type header at all.
		return (
			ReadableStream &&
			!new Request("", {
				body: new ReadableStream(),
				method: "POST",
				// @ts-expect-error chrome suppport, see https://github.com/whatwg/fetch/issues/1438#issuecomment-1228635501
				duplex: "half",
			}).headers.has("Content-Type")
		);
	}

	// https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream#async_iterator_to_stream
	function iteratorToStream(iterator: AsyncIterator<string>) {
		return new ReadableStream<string>({
			async pull(controller) {
				const { value, done } = await iterator.next();

				if (done) {
					controller.close();
				} else {
					controller.enqueue(value);
				}
			},
		});
	}
	async function* commitPayload(lfsFiles: UploadFile[], nonLfsFiles: UploadFile[]): AsyncGenerator<string> {
		const initialChangeCount = changeCount;

		const line: CommitJsonLine = {
			key: "header",
			value: {
				summary:
					summary ||
					(selectedFiles.length === 1 ? `Upload ${selectedFiles[0].name}` : `Upload ${selectedFiles.length} files`),
				description,
				parentCommit: latestCommit,
			},
		};

		yield JSON.stringify(line);

		if (changeCount !== initialChangeCount) {
			throw new Error("upload aborted");
		}

		for (const file of lfsFiles) {
			const line: CommitJsonLine = {
				key: "lfsFile",
				value: {
					oid: file.upload.sha256!,
					size: file.size,
					algo: "sha256",
					path: path ? `${path}/${file.fullPath}` : file.fullPath,
				},
			};
			yield "\n" + JSON.stringify(line);

			if (changeCount !== initialChangeCount) {
				throw new Error("upload aborted");
			}
		}

		const isReadStream = supportRequestStream();

		for (const file of nonLfsFiles) {
			const fileIndex = selectedFiles.indexOf(file);

			const line: CommitJsonLine = {
				key: "file",
				value: {
					path: path ? `${path}/${file.fullPath}` : file.fullPath,
					encoding: "base64",
					content: await blobToBase64(file),
				},
			};
			if (changeCount !== initialChangeCount) {
				throw new Error("upload aborted");
			}
			const payload = "\n" + JSON.stringify(line);

			file.upload.progressRatio = 0;
			file.upload.state = "uploading";
			selectedFiles[fileIndex] = selectedFiles[fileIndex];

			// In case of an actual working read stream, we update the progress bar for the file as we release
			// the data
			if (isReadStream) {
				const chunks = chunk(payload, 20_000);
				for (let i = 0; i < chunks.length; i++) {
					yield chunks[i];
					file.upload.progressRatio = (i + 1) / chunks.length;
					selectedFiles[fileIndex] = selectedFiles[fileIndex];

					if (changeCount !== initialChangeCount) {
						throw new Error("upload aborted");
					}
				}
			} else {
				yield payload;
			}

			file.upload.state = "completed";
		}
	}
	async function uploadWithProgressFetch(
		source: AsyncIterator<string>,
		url: string,
		opts: {
			headers?: Record<string, string>;
			index?: number;
			method: "PUT" | "POST";
		}
	) {
		return fetch(url, {
			method: opts.method,
			body: iteratorToStream(source).pipeThrough(new TextEncoderStream()),
			headers: opts.headers,
			// @ts-expect-error chrome suppport, see https://github.com/whatwg/fetch/issues/1438#issuecomment-1228635501
			duplex: "half",
		});
	}

Metadata

Metadata

Assignees

No one assigned

    Labels

    technicalAdvanced stuff!wontfixThis will not be worked on

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions