diff --git a/CHANGELOG.md b/CHANGELOG.md index e47656e6..4ca3c9c1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,19 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). ## [Unreleased] +### Added + +- Streamed upload + +### Changed + +- Upload modes available: Stream, Chunked, Compatibility (old slow version) + +### Fixed + +- Requests only retry upon bearer authentication +- Streamed upload isn't stored in memory when request retries + ## [v1.3.4] - 2025-12-10 ### Added diff --git a/kirc-blocking/src/main/kotlin/de/cmdjulian/kirc/client/BlockingContainerImageRegistryClient.kt b/kirc-blocking/src/main/kotlin/de/cmdjulian/kirc/client/BlockingContainerImageRegistryClient.kt index 76d91fe1..9ee9a7bf 100644 --- a/kirc-blocking/src/main/kotlin/de/cmdjulian/kirc/client/BlockingContainerImageRegistryClient.kt +++ b/kirc-blocking/src/main/kotlin/de/cmdjulian/kirc/client/BlockingContainerImageRegistryClient.kt @@ -88,7 +88,12 @@ interface BlockingContainerImageRegistryClient { * * @return the digest of uploaded image */ - fun upload(repository: Repository, reference: Reference, tar: InputStream): Digest + fun upload( + repository: Repository, + reference: Reference, + tar: InputStream, + mode: UploadMode = UploadMode.Stream, + ): Digest /** * Downloads a docker image for certain [reference]. @@ -144,12 +149,13 @@ fun SuspendingContainerImageRegistryClient.toBlockingClient() = object : Blockin return client.toBlockingClient() } - override fun upload(repository: Repository, reference: Reference, tar: InputStream): Digest = + override fun upload(repository: Repository, reference: Reference, tar: InputStream, mode: UploadMode): Digest = runBlocking(Dispatchers.Default) { this@toBlockingClient.upload( repository, reference, tar.asSource().buffered(), + mode, ) } diff --git a/kirc-core/src/main/kotlin/de/cmdjulian/kirc/client/UploadMode.kt b/kirc-core/src/main/kotlin/de/cmdjulian/kirc/client/UploadMode.kt new file mode 100644 index 00000000..27906639 --- /dev/null +++ b/kirc-core/src/main/kotlin/de/cmdjulian/kirc/client/UploadMode.kt @@ -0,0 +1,17 @@ +package de.cmdjulian.kirc.client + +/** + * Determines the way image blobs are uploaded. + * + * Currently supported modes: + * - [Stream]: uploads blob as stream in one request + * - [Chunked]: splits blob into [Chunked.chunkSize] bytes and uploads them one after another + * - [Compatibility]: uploads blob split into 8KB chunks. This is very slow but in some cases works best. Use Chunked instead. + */ +sealed class UploadMode { + data object Stream : UploadMode() + + @Deprecated("Use Chunked and specify the chunk size") + data object Compatibility : UploadMode() + data class Chunked(val chunkSize: Long) : UploadMode() +} diff --git a/kirc-reactive/src/main/kotlin/de/cmdjulian/kirc/client/ReactiveContainerImageRegistryClient.kt b/kirc-reactive/src/main/kotlin/de/cmdjulian/kirc/client/ReactiveContainerImageRegistryClient.kt index 22a1ffa1..526f303e 100644 --- a/kirc-reactive/src/main/kotlin/de/cmdjulian/kirc/client/ReactiveContainerImageRegistryClient.kt +++ b/kirc-reactive/src/main/kotlin/de/cmdjulian/kirc/client/ReactiveContainerImageRegistryClient.kt @@ -89,7 +89,12 @@ interface ReactiveContainerImageRegistryClient { * * @return the digest of uploaded image */ - fun upload(repository: Repository, reference: Reference, tar: Flux): Mono + fun upload( + repository: Repository, + reference: Reference, + tar: Flux, + mode: UploadMode = UploadMode.Stream, + ): Mono /** * Downloads a docker image for certain [reference]. @@ -141,10 +146,11 @@ fun SuspendingContainerImageRegistryClient.toReactiveClient() = object : Reactiv } } - override fun upload(repository: Repository, reference: Reference, tar: Flux): Mono = mono { - val buffer = Buffer().also { buffer -> tar.collect(buffer::writeByte) } - this@toReactiveClient.upload(repository, reference, buffer) - } + override fun upload(repository: Repository, reference: Reference, tar: Flux, mode: UploadMode): Mono = + mono { + val buffer = Buffer().also { buffer -> tar.collect(buffer::writeByte) } + this@toReactiveClient.upload(repository, reference, buffer, mode) + } override fun download(repository: Repository, reference: Reference): Flux = flux { this@toReactiveClient.download(repository, reference).use { result -> diff --git a/kirc-suspending/src/main/kotlin/de/cmdjulian/kirc/client/SuspendingContainerImageRegistryClient.kt b/kirc-suspending/src/main/kotlin/de/cmdjulian/kirc/client/SuspendingContainerImageRegistryClient.kt index 49cd620b..9906964e 100644 --- a/kirc-suspending/src/main/kotlin/de/cmdjulian/kirc/client/SuspendingContainerImageRegistryClient.kt +++ b/kirc-suspending/src/main/kotlin/de/cmdjulian/kirc/client/SuspendingContainerImageRegistryClient.kt @@ -6,6 +6,7 @@ import de.cmdjulian.kirc.image.Repository import de.cmdjulian.kirc.image.Tag import de.cmdjulian.kirc.impl.response.ResultSource import de.cmdjulian.kirc.impl.response.UploadSession +import de.cmdjulian.kirc.spec.UploadBlobPath import de.cmdjulian.kirc.spec.image.ImageConfig import de.cmdjulian.kirc.spec.manifest.Manifest import de.cmdjulian.kirc.spec.manifest.ManifestList @@ -113,9 +114,17 @@ interface SuspendingContainerImageRegistryClient { suspend fun uploadBlobChunks(session: UploadSession, path: Path, chunkSize: Long = 10 * 1048576L): UploadSession /** - * Uploads an entire blob by stream + * Uploads an entire blob chunk-wise for reduced memory load */ - suspend fun uploadBlobStream(session: UploadSession, stream: Source): UploadSession + @Deprecated("Use chunked upload instead") + suspend fun uploadBlobCompatibility(session: UploadSession, path: Path): UploadSession + + /** + * Uploads an entire [blob] by stream + * + * Closes [session] by uploading the whole blob in one monolithic upload and returns its digest upon success. + */ + suspend fun uploadBlobStream(session: UploadSession, blob: UploadBlobPath): Digest /** * Upload a manifest @@ -140,7 +149,12 @@ interface SuspendingContainerImageRegistryClient { * * @return the digest of uploaded image */ - suspend fun upload(repository: Repository, reference: Reference, tar: Source): Digest + suspend fun upload( + repository: Repository, + reference: Reference, + tar: Source, + mode: UploadMode = UploadMode.Stream, + ): Digest /** * Downloads a docker image for certain [reference]. diff --git a/kirc-suspending/src/main/kotlin/de/cmdjulian/kirc/impl/ContainerRegistryApi.kt b/kirc-suspending/src/main/kotlin/de/cmdjulian/kirc/impl/ContainerRegistryApi.kt index 6f25441c..633bb5ec 100644 --- a/kirc-suspending/src/main/kotlin/de/cmdjulian/kirc/impl/ContainerRegistryApi.kt +++ b/kirc-suspending/src/main/kotlin/de/cmdjulian/kirc/impl/ContainerRegistryApi.kt @@ -13,6 +13,7 @@ import de.cmdjulian.kirc.spec.manifest.Manifest import de.cmdjulian.kirc.spec.manifest.ManifestSingle import kotlinx.io.Buffer import kotlinx.io.Source +import java.nio.file.Path /** * Defines the calls to the container registry API @@ -77,8 +78,17 @@ internal interface ContainerRegistryApi { endRange: Long, ): Result - /** Uploads the whole blob data [Source] as stream */ - suspend fun uploadBlobStream(session: UploadSession, source: Source): Result + /** + * Uploads the whole blob data from [path] with [size] and its [digest] as stream + * + * Upon success, should return the same [digest] and closes [session] + */ + suspend fun uploadBlobStream( + session: UploadSession, + path: Path, + size: Long, + digest: Digest, + ): Result /** Retrieve the status of provided [session], returning the range of already uploaded data (start, end) */ suspend fun uploadStatus(session: UploadSession): Result, FuelError> diff --git a/kirc-suspending/src/main/kotlin/de/cmdjulian/kirc/impl/ContainerRegistryApiImpl.kt b/kirc-suspending/src/main/kotlin/de/cmdjulian/kirc/impl/ContainerRegistryApiImpl.kt index 7ccd660e..ca715c84 100644 --- a/kirc-suspending/src/main/kotlin/de/cmdjulian/kirc/impl/ContainerRegistryApiImpl.kt +++ b/kirc-suspending/src/main/kotlin/de/cmdjulian/kirc/impl/ContainerRegistryApiImpl.kt @@ -36,8 +36,9 @@ import de.cmdjulian.kirc.utils.mapToResultSource import de.cmdjulian.kirc.utils.mapToUploadSession import kotlinx.io.Buffer import kotlinx.io.Source -import kotlinx.io.asInputStream import kotlinx.io.readByteArray +import java.nio.file.Path +import kotlin.io.path.inputStream private const val APPLICATION_JSON = "application/json" private const val APPLICATION_OCTET_STREAM = "application/octet-stream" @@ -263,14 +264,23 @@ internal class ContainerRegistryApiImpl(private val fuelManager: FuelManager, cr .let { responseResult -> handler.retryOnUnauthorized(responseResult, EmptyDeserializer) } .mapToUploadSession() - // Currently not working as intended, because internal fuel buffer has an overflow - override suspend fun uploadBlobStream(session: UploadSession, source: Source): Result = - fuelManager.patch(session.location) - .appendHeader(Headers.CONTENT_TYPE, APPLICATION_OCTET_STREAM) - .body(source::asInputStream) - .awaitResponseResult(EmptyDeserializer) - .let { responseResult -> handler.retryOnUnauthorized(responseResult, EmptyDeserializer) } - .mapToUploadSession() + override suspend fun uploadBlobStream( + session: UploadSession, + path: Path, + size: Long, + digest: Digest, + ): Result = fuelManager.put(session.location, listOf("digest" to digest)) + .appendHeader(Headers.CONTENT_TYPE, APPLICATION_OCTET_STREAM) + .body(path::inputStream, { size }) + .awaitResponseResult(EmptyDeserializer) + .let { responseResult -> + handler.retryOnUnauthorized( + responseResult, + EmptyDeserializer, + RetryMode.Stream(path::inputStream, size), + ) + } + .mapToDigest() override suspend fun uploadStatus(session: UploadSession): Result, FuelError> = fuelManager.get(session.location) diff --git a/kirc-suspending/src/main/kotlin/de/cmdjulian/kirc/impl/ResponseRetryWithAuthentication.kt b/kirc-suspending/src/main/kotlin/de/cmdjulian/kirc/impl/ResponseRetryWithAuthentication.kt index 682bcf35..bc77f067 100644 --- a/kirc-suspending/src/main/kotlin/de/cmdjulian/kirc/impl/ResponseRetryWithAuthentication.kt +++ b/kirc-suspending/src/main/kotlin/de/cmdjulian/kirc/impl/ResponseRetryWithAuthentication.kt @@ -14,6 +14,18 @@ import de.cmdjulian.kirc.client.RegistryCredentials import de.cmdjulian.kirc.utils.CaseInsensitiveMap import im.toss.http.parser.HttpAuthCredentials import io.goodforgod.graalvm.hint.annotation.ReflectionHint +import java.io.InputStream + +/** + * Defines body source for the retry method: + * + * - [Default]: copies body from original request + * - [Stream]: adds InputStream as body from provided lambda + */ +internal sealed class RetryMode { + data object Default : RetryMode() + data class Stream(val stream: () -> InputStream, val size: Long) : RetryMode() +} internal class ResponseRetryWithAuthentication( private val credentials: RegistryCredentials?, @@ -22,12 +34,17 @@ internal class ResponseRetryWithAuthentication( suspend fun retryOnUnauthorized( responseResult: ResponseResultOf, deserializer: Deserializable, + mode: RetryMode = RetryMode.Default, ): ResponseResultOf { val (request, response, _) = responseResult val headers = CaseInsensitiveMap(response.headers) if (response.statusCode == 401 && "www-authenticate" in headers) { val retryableRequest = retryRequest(headers["www-authenticate"]?.first(), request) + when (mode) { + is RetryMode.Default -> Unit + is RetryMode.Stream -> retryableRequest?.body(mode.stream, { mode.size }) + } retryableRequest?.let { return it.awaitResponseResult(deserializer) } } diff --git a/kirc-suspending/src/main/kotlin/de/cmdjulian/kirc/impl/SuspendingContainerImageRegistryClientImpl.kt b/kirc-suspending/src/main/kotlin/de/cmdjulian/kirc/impl/SuspendingContainerImageRegistryClientImpl.kt index d950aae9..9e9c6f1b 100644 --- a/kirc-suspending/src/main/kotlin/de/cmdjulian/kirc/impl/SuspendingContainerImageRegistryClientImpl.kt +++ b/kirc-suspending/src/main/kotlin/de/cmdjulian/kirc/impl/SuspendingContainerImageRegistryClientImpl.kt @@ -5,6 +5,7 @@ import com.github.kittinunf.result.map import com.github.kittinunf.result.onError import de.cmdjulian.kirc.client.SuspendingContainerImageClient import de.cmdjulian.kirc.client.SuspendingContainerImageRegistryClient +import de.cmdjulian.kirc.client.UploadMode import de.cmdjulian.kirc.image.ContainerImageName import de.cmdjulian.kirc.image.Digest import de.cmdjulian.kirc.image.Reference @@ -16,6 +17,7 @@ import de.cmdjulian.kirc.impl.response.Catalog import de.cmdjulian.kirc.impl.response.ResultSource import de.cmdjulian.kirc.impl.response.TagList import de.cmdjulian.kirc.impl.response.UploadSession +import de.cmdjulian.kirc.spec.UploadBlobPath import de.cmdjulian.kirc.spec.image.DockerImageConfigV1 import de.cmdjulian.kirc.spec.image.ImageConfig import de.cmdjulian.kirc.spec.image.OciImageConfigV1 @@ -124,8 +126,8 @@ internal class SuspendingContainerImageRegistryClientImpl(private val api: Conta override suspend fun initiateBlobUpload(repository: Repository): UploadSession = api.initiateUpload(repository).getOrElse { throw it.toRegistryClientError(repository, null) } - override suspend fun uploadBlobStream(session: UploadSession, stream: Source): UploadSession = - api.uploadBlobStream(session, stream).getOrElse { throw it.toRegistryClientError() } + override suspend fun uploadBlobStream(session: UploadSession, blob: UploadBlobPath): Digest = + api.uploadBlobStream(session, blob.path, blob.size, blob.digest).getOrElse { throw it.toRegistryClientError() } override suspend fun uploadBlobChunks(session: UploadSession, path: Path, chunkSize: Long): UploadSession = withContext(Dispatchers.IO) { SystemFileSystem.source(path.toKotlinPath()).buffered() }.use { stream -> @@ -141,16 +143,40 @@ internal class SuspendingContainerImageRegistryClientImpl(private val api: Conta } catch (_: EOFException) { // expected behavior when EOF is reached } finally { - val bytesRead = buffer.size - val endRange = startRange + bytesRead - 1 - currentSession = api.uploadBlobChunked(currentSession, buffer, startRange, endRange) - .getOrElse { throw it.toRegistryClientError() } - startRange = endRange + 1 + if (buffer.size > 0) { + val bytesRead = buffer.size + val endRange = startRange + bytesRead - 1 + currentSession = api.uploadBlobChunked(currentSession, buffer, startRange, endRange) + .getOrElse { throw it.toRegistryClientError() } + startRange = endRange + 1 + } } } currentSession } + @Deprecated("Use chunked upload instead") + override suspend fun uploadBlobCompatibility(session: UploadSession, path: Path): UploadSession = + withContext(Dispatchers.IO) { + SystemFileSystem.source(path.toKotlinPath()).buffered() + }.use { stream -> + var returnedSession = session + var startRange = 0L + var endRange: Long + + while (!stream.exhausted()) { + val buffer = Buffer() + // will not read more than 8KB into buffer but this version worked always + stream.readAtMostTo(buffer, 10 * 1048576L) + endRange = startRange + buffer.size - 1 + returnedSession = api.uploadBlobChunked(returnedSession, buffer, startRange, endRange) + .getOrElse { throw it.toRegistryClientError() } + startRange = endRange + } + + returnedSession + } + override suspend fun finishBlobUpload(session: UploadSession, digest: Digest): Digest = api.finishBlobUpload(session, digest) .getOrElse { throw it.toRegistryClientError(null, digest) } @@ -166,8 +192,8 @@ internal class SuspendingContainerImageRegistryClientImpl(private val api: Conta api.uploadManifest(repository, reference, manifest) .getOrElse { throw it.toRegistryClientError(repository, reference) } - override suspend fun upload(repository: Repository, reference: Reference, tar: Source): Digest = - uploader.upload(repository, reference, tar) + override suspend fun upload(repository: Repository, reference: Reference, tar: Source, mode: UploadMode): Digest = + uploader.upload(repository, reference, tar, mode) override suspend fun download(repository: Repository, reference: Reference): Source = downloader.download(repository, reference) diff --git a/kirc-suspending/src/main/kotlin/de/cmdjulian/kirc/impl/delegate/ImageUploader.kt b/kirc-suspending/src/main/kotlin/de/cmdjulian/kirc/impl/delegate/ImageUploader.kt index cbeb16f7..6f838019 100644 --- a/kirc-suspending/src/main/kotlin/de/cmdjulian/kirc/impl/delegate/ImageUploader.kt +++ b/kirc-suspending/src/main/kotlin/de/cmdjulian/kirc/impl/delegate/ImageUploader.kt @@ -2,6 +2,7 @@ package de.cmdjulian.kirc.impl.delegate import de.cmdjulian.kirc.KircUploadException import de.cmdjulian.kirc.client.SuspendingContainerImageRegistryClient +import de.cmdjulian.kirc.client.UploadMode import de.cmdjulian.kirc.image.Digest import de.cmdjulian.kirc.image.Reference import de.cmdjulian.kirc.image.Repository @@ -36,7 +37,12 @@ import kotlin.io.path.pathString internal class ImageUploader(private val client: SuspendingContainerImageRegistryClient, private val tmpPath: Path) { - suspend fun upload(repository: Repository, reference: Reference, tar: Source): Digest = coroutineScope { + suspend fun upload( + repository: Repository, + reference: Reference, + tar: Source, + upload: UploadMode = UploadMode.Stream, + ): Digest = coroutineScope { // store data temporarily val tempDirectory = Path.of( tmpPath.pathString, @@ -62,9 +68,20 @@ internal class ImageUploader(private val client: SuspendingContainerImageRegistr if (!client.existsBlob(repository, blob.digest)) { val session = client.initiateBlobUpload(repository) - val endSession = client.uploadBlobChunks(session, blob.path) + when (upload) { + is UploadMode.Chunked -> { + val endSession = + client.uploadBlobChunks(session, blob.path, upload.chunkSize) + client.finishBlobUpload(endSession, blob.digest) + } - client.finishBlobUpload(endSession, blob.digest) + is UploadMode.Compatibility -> { + val endSession = client.uploadBlobCompatibility(session, blob.path) + client.finishBlobUpload(endSession, blob.digest) + } + + is UploadMode.Stream -> client.uploadBlobStream(session, blob) + } } } } diff --git a/kirc-suspending/src/test/kotlin/de/cmdjulian/kirc/BlockingRegistryTest.kt b/kirc-suspending/src/test/kotlin/de/cmdjulian/kirc/BlockingRegistryTest.kt index da283e6f..1eb4ea10 100644 --- a/kirc-suspending/src/test/kotlin/de/cmdjulian/kirc/BlockingRegistryTest.kt +++ b/kirc-suspending/src/test/kotlin/de/cmdjulian/kirc/BlockingRegistryTest.kt @@ -3,6 +3,7 @@ package de.cmdjulian.kirc import de.cmdjulian.kirc.client.BlockingContainerImageClientFactory import de.cmdjulian.kirc.client.BlockingContainerImageRegistryClient import de.cmdjulian.kirc.client.RegistryCredentials +import de.cmdjulian.kirc.client.UploadMode import de.cmdjulian.kirc.image.Digest import de.cmdjulian.kirc.image.Repository import de.cmdjulian.kirc.image.Tag @@ -151,7 +152,7 @@ internal class BlockingRegistryTest { } @Test - fun `upload - to registry`() { + fun `upload stream - to registry`() { val data = SystemFileSystem.source(Path(helloWorldImage.path)) val repository = Repository("python") val tag = Tag("test") @@ -159,7 +160,38 @@ internal class BlockingRegistryTest { client.exists(repository, tag) shouldBe false shouldNotThrowAny { - client.upload(repository, tag, data.buffered().asInputStream()) + client.upload(repository, tag, data.buffered().asInputStream(), UploadMode.Stream) + } + + client.exists(repository, tag) shouldBe true + } + + @Test + fun `upload chunked - to registry`() { + val data = SystemFileSystem.source(Path(helloWorldImage.path)) + val repository = Repository("python") + val tag = Tag("test") + val upload = UploadMode.Chunked(10 * 1048576L) // 10MB + + client.exists(repository, tag) shouldBe false + + shouldNotThrowAny { + client.upload(repository, tag, data.buffered().asInputStream(), upload) + } + + client.exists(repository, tag) shouldBe true + } + + @Test + fun `upload compatibility - to registry`() { + val data = SystemFileSystem.source(Path(helloWorldImage.path)) + val repository = Repository("python") + val tag = Tag("test") + + client.exists(repository, tag) shouldBe false + + shouldNotThrowAny { + client.upload(repository, tag, data.buffered().asInputStream(), UploadMode.Compatibility) } client.exists(repository, tag) shouldBe true @@ -175,7 +207,7 @@ internal class BlockingRegistryTest { val result = client.download(repository, tag) shouldNotThrowAny { // check if upload of downloaded data possible - client.upload(Repository("test"), Tag("upload"), result) + client.upload(Repository("test"), Tag("upload"), result, UploadMode.Stream) } } diff --git a/kirc-suspending/src/test/kotlin/de/cmdjulian/kirc/DockerRegistryCliHelper.kt b/kirc-suspending/src/test/kotlin/de/cmdjulian/kirc/DockerRegistryCliHelper.kt index af1cf3bb..bde1fbd2 100644 --- a/kirc-suspending/src/test/kotlin/de/cmdjulian/kirc/DockerRegistryCliHelper.kt +++ b/kirc-suspending/src/test/kotlin/de/cmdjulian/kirc/DockerRegistryCliHelper.kt @@ -3,6 +3,7 @@ package de.cmdjulian.kirc import de.cmdjulian.kirc.client.BlockingContainerImageClientFactory import de.cmdjulian.kirc.client.BlockingContainerImageRegistryClient import de.cmdjulian.kirc.client.RegistryCredentials +import de.cmdjulian.kirc.client.UploadMode import de.cmdjulian.kirc.image.Digest import de.cmdjulian.kirc.image.Reference import de.cmdjulian.kirc.image.Repository @@ -26,7 +27,7 @@ class DockerRegistryCliHelper(addressName: String, credentials: RegistryCredenti fun pushImage(repository: Repository, reference: Reference, url: URL): Digest { images.add(UploadReference(repository, reference)) val source = runBlocking(Dispatchers.IO) { SystemFileSystem.source(Path(url.path)) } - return client.upload(repository, reference, source.buffered().asInputStream()) + return client.upload(repository, reference, source.buffered().asInputStream(), UploadMode.Stream) } fun deleteAll() {