Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,19 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).

## [Unreleased]

### Added

- Streamed upload

### Changed

- Upload modes available: Stream, Chunked, Compatibility (old slow version)

### Fixed

- Requests only retry upon bearer authentication
- Streamed upload isn't stored in memory when request retries

## [v1.3.4] - 2025-12-10

### Added
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,12 @@ interface BlockingContainerImageRegistryClient {
*
* @return the digest of uploaded image
*/
fun upload(repository: Repository, reference: Reference, tar: InputStream): Digest
fun upload(
repository: Repository,
reference: Reference,
tar: InputStream,
mode: UploadMode = UploadMode.Stream,
): Digest

/**
* Downloads a docker image for certain [reference].
Expand Down Expand Up @@ -144,12 +149,13 @@ fun SuspendingContainerImageRegistryClient.toBlockingClient() = object : Blockin
return client.toBlockingClient()
}

override fun upload(repository: Repository, reference: Reference, tar: InputStream): Digest =
override fun upload(repository: Repository, reference: Reference, tar: InputStream, mode: UploadMode): Digest =
runBlocking(Dispatchers.Default) {
this@toBlockingClient.upload(
repository,
reference,
tar.asSource().buffered(),
mode,
)
}

Expand Down
17 changes: 17 additions & 0 deletions kirc-core/src/main/kotlin/de/cmdjulian/kirc/client/UploadMode.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package de.cmdjulian.kirc.client

/**
* Determines the way image blobs are uploaded.
*
* Currently supported modes:
* - [Stream]: uploads blob as stream in one request
* - [Chunked]: splits blob into [Chunked.chunkSize] bytes and uploads them one after another
* - [Compatibility]: uploads blob split into 8KB chunks. This is very slow but in some cases works best. Use Chunked instead.
*/
sealed class UploadMode {
data object Stream : UploadMode()

@Deprecated("Use Chunked and specify the chunk size")
data object Compatibility : UploadMode()
data class Chunked(val chunkSize: Long) : UploadMode()
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,12 @@ interface ReactiveContainerImageRegistryClient {
*
* @return the digest of uploaded image
*/
fun upload(repository: Repository, reference: Reference, tar: Flux<Byte>): Mono<Digest>
fun upload(
repository: Repository,
reference: Reference,
tar: Flux<Byte>,
mode: UploadMode = UploadMode.Stream,
): Mono<Digest>

/**
* Downloads a docker image for certain [reference].
Expand Down Expand Up @@ -141,10 +146,11 @@ fun SuspendingContainerImageRegistryClient.toReactiveClient() = object : Reactiv
}
}

override fun upload(repository: Repository, reference: Reference, tar: Flux<Byte>): Mono<Digest> = mono {
val buffer = Buffer().also { buffer -> tar.collect(buffer::writeByte) }
this@toReactiveClient.upload(repository, reference, buffer)
}
override fun upload(repository: Repository, reference: Reference, tar: Flux<Byte>, mode: UploadMode): Mono<Digest> =
mono {
val buffer = Buffer().also { buffer -> tar.collect(buffer::writeByte) }
this@toReactiveClient.upload(repository, reference, buffer, mode)
}

override fun download(repository: Repository, reference: Reference): Flux<Byte> = flux {
this@toReactiveClient.download(repository, reference).use { result ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import de.cmdjulian.kirc.image.Repository
import de.cmdjulian.kirc.image.Tag
import de.cmdjulian.kirc.impl.response.ResultSource
import de.cmdjulian.kirc.impl.response.UploadSession
import de.cmdjulian.kirc.spec.UploadBlobPath
import de.cmdjulian.kirc.spec.image.ImageConfig
import de.cmdjulian.kirc.spec.manifest.Manifest
import de.cmdjulian.kirc.spec.manifest.ManifestList
Expand Down Expand Up @@ -113,9 +114,17 @@ interface SuspendingContainerImageRegistryClient {
suspend fun uploadBlobChunks(session: UploadSession, path: Path, chunkSize: Long = 10 * 1048576L): UploadSession

/**
* Uploads an entire blob by stream
* Uploads an entire blob chunk-wise for reduced memory load
*/
suspend fun uploadBlobStream(session: UploadSession, stream: Source): UploadSession
@Deprecated("Use chunked upload instead")
suspend fun uploadBlobCompatibility(session: UploadSession, path: Path): UploadSession

/**
* Uploads an entire [blob] by stream
*
* Closes [session] by uploading the whole blob in one monolithic upload and returns its digest upon success.
*/
suspend fun uploadBlobStream(session: UploadSession, blob: UploadBlobPath): Digest

/**
* Upload a manifest
Expand All @@ -140,7 +149,12 @@ interface SuspendingContainerImageRegistryClient {
*
* @return the digest of uploaded image
*/
suspend fun upload(repository: Repository, reference: Reference, tar: Source): Digest
suspend fun upload(
repository: Repository,
reference: Reference,
tar: Source,
mode: UploadMode = UploadMode.Stream,
): Digest

/**
* Downloads a docker image for certain [reference].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import de.cmdjulian.kirc.spec.manifest.Manifest
import de.cmdjulian.kirc.spec.manifest.ManifestSingle
import kotlinx.io.Buffer
import kotlinx.io.Source
import java.nio.file.Path

/**
* Defines the calls to the container registry API
Expand Down Expand Up @@ -77,8 +78,17 @@ internal interface ContainerRegistryApi {
endRange: Long,
): Result<UploadSession, FuelError>

/** Uploads the whole blob data [Source] as stream */
suspend fun uploadBlobStream(session: UploadSession, source: Source): Result<UploadSession, FuelError>
/**
* Uploads the whole blob data from [path] with [size] and its [digest] as stream
*
* Upon success, should return the same [digest] and closes [session]
*/
suspend fun uploadBlobStream(
session: UploadSession,
path: Path,
size: Long,
digest: Digest,
): Result<Digest, FuelError>

/** Retrieve the status of provided [session], returning the range of already uploaded data (start, end) */
suspend fun uploadStatus(session: UploadSession): Result<Pair<Long, Long>, FuelError>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ import de.cmdjulian.kirc.utils.mapToResultSource
import de.cmdjulian.kirc.utils.mapToUploadSession
import kotlinx.io.Buffer
import kotlinx.io.Source
import kotlinx.io.asInputStream
import kotlinx.io.readByteArray
import java.nio.file.Path
import kotlin.io.path.inputStream

private const val APPLICATION_JSON = "application/json"
private const val APPLICATION_OCTET_STREAM = "application/octet-stream"
Expand Down Expand Up @@ -263,14 +264,23 @@ internal class ContainerRegistryApiImpl(private val fuelManager: FuelManager, cr
.let { responseResult -> handler.retryOnUnauthorized(responseResult, EmptyDeserializer) }
.mapToUploadSession()

// Currently not working as intended, because internal fuel buffer has an overflow
override suspend fun uploadBlobStream(session: UploadSession, source: Source): Result<UploadSession, FuelError> =
fuelManager.patch(session.location)
.appendHeader(Headers.CONTENT_TYPE, APPLICATION_OCTET_STREAM)
.body(source::asInputStream)
.awaitResponseResult(EmptyDeserializer)
.let { responseResult -> handler.retryOnUnauthorized(responseResult, EmptyDeserializer) }
.mapToUploadSession()
override suspend fun uploadBlobStream(
session: UploadSession,
path: Path,
size: Long,
digest: Digest,
): Result<Digest, FuelError> = fuelManager.put(session.location, listOf("digest" to digest))
.appendHeader(Headers.CONTENT_TYPE, APPLICATION_OCTET_STREAM)
.body(path::inputStream, { size })
.awaitResponseResult(EmptyDeserializer)
.let { responseResult ->
handler.retryOnUnauthorized(
responseResult,
EmptyDeserializer,
RetryMode.Stream(path::inputStream, size),
)
}
.mapToDigest()

override suspend fun uploadStatus(session: UploadSession): Result<Pair<Long, Long>, FuelError> =
fuelManager.get(session.location)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,18 @@ import de.cmdjulian.kirc.client.RegistryCredentials
import de.cmdjulian.kirc.utils.CaseInsensitiveMap
import im.toss.http.parser.HttpAuthCredentials
import io.goodforgod.graalvm.hint.annotation.ReflectionHint
import java.io.InputStream

/**
* Defines body source for the retry method:
*
* - [Default]: copies body from original request
* - [Stream]: adds InputStream as body from provided lambda
*/
internal sealed class RetryMode {
data object Default : RetryMode()
data class Stream(val stream: () -> InputStream, val size: Long) : RetryMode()
}

internal class ResponseRetryWithAuthentication(
private val credentials: RegistryCredentials?,
Expand All @@ -22,12 +34,17 @@ internal class ResponseRetryWithAuthentication(
suspend fun <T : Any> retryOnUnauthorized(
responseResult: ResponseResultOf<T>,
deserializer: Deserializable<T>,
mode: RetryMode = RetryMode.Default,
): ResponseResultOf<T> {
val (request, response, _) = responseResult
val headers = CaseInsensitiveMap(response.headers)

if (response.statusCode == 401 && "www-authenticate" in headers) {
val retryableRequest = retryRequest(headers["www-authenticate"]?.first(), request)
when (mode) {
is RetryMode.Default -> Unit
is RetryMode.Stream -> retryableRequest?.body(mode.stream, { mode.size })
}
retryableRequest?.let { return it.awaitResponseResult(deserializer) }
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import com.github.kittinunf.result.map
import com.github.kittinunf.result.onError
import de.cmdjulian.kirc.client.SuspendingContainerImageClient
import de.cmdjulian.kirc.client.SuspendingContainerImageRegistryClient
import de.cmdjulian.kirc.client.UploadMode
import de.cmdjulian.kirc.image.ContainerImageName
import de.cmdjulian.kirc.image.Digest
import de.cmdjulian.kirc.image.Reference
Expand All @@ -16,6 +17,7 @@ import de.cmdjulian.kirc.impl.response.Catalog
import de.cmdjulian.kirc.impl.response.ResultSource
import de.cmdjulian.kirc.impl.response.TagList
import de.cmdjulian.kirc.impl.response.UploadSession
import de.cmdjulian.kirc.spec.UploadBlobPath
import de.cmdjulian.kirc.spec.image.DockerImageConfigV1
import de.cmdjulian.kirc.spec.image.ImageConfig
import de.cmdjulian.kirc.spec.image.OciImageConfigV1
Expand Down Expand Up @@ -124,8 +126,8 @@ internal class SuspendingContainerImageRegistryClientImpl(private val api: Conta
override suspend fun initiateBlobUpload(repository: Repository): UploadSession =
api.initiateUpload(repository).getOrElse { throw it.toRegistryClientError(repository, null) }

override suspend fun uploadBlobStream(session: UploadSession, stream: Source): UploadSession =
api.uploadBlobStream(session, stream).getOrElse { throw it.toRegistryClientError() }
override suspend fun uploadBlobStream(session: UploadSession, blob: UploadBlobPath): Digest =
api.uploadBlobStream(session, blob.path, blob.size, blob.digest).getOrElse { throw it.toRegistryClientError() }

override suspend fun uploadBlobChunks(session: UploadSession, path: Path, chunkSize: Long): UploadSession =
withContext(Dispatchers.IO) { SystemFileSystem.source(path.toKotlinPath()).buffered() }.use { stream ->
Expand All @@ -141,16 +143,40 @@ internal class SuspendingContainerImageRegistryClientImpl(private val api: Conta
} catch (_: EOFException) {
// expected behavior when EOF is reached
} finally {
val bytesRead = buffer.size
val endRange = startRange + bytesRead - 1
currentSession = api.uploadBlobChunked(currentSession, buffer, startRange, endRange)
.getOrElse { throw it.toRegistryClientError() }
startRange = endRange + 1
if (buffer.size > 0) {
val bytesRead = buffer.size
val endRange = startRange + bytesRead - 1
currentSession = api.uploadBlobChunked(currentSession, buffer, startRange, endRange)
.getOrElse { throw it.toRegistryClientError() }
startRange = endRange + 1
}
}
}
currentSession
}

@Deprecated("Use chunked upload instead")
override suspend fun uploadBlobCompatibility(session: UploadSession, path: Path): UploadSession =
withContext(Dispatchers.IO) {
SystemFileSystem.source(path.toKotlinPath()).buffered()
}.use { stream ->
var returnedSession = session
var startRange = 0L
var endRange: Long

while (!stream.exhausted()) {
val buffer = Buffer()
// will not read more than 8KB into buffer but this version worked always
stream.readAtMostTo(buffer, 10 * 1048576L)
endRange = startRange + buffer.size - 1
returnedSession = api.uploadBlobChunked(returnedSession, buffer, startRange, endRange)
.getOrElse { throw it.toRegistryClientError() }
startRange = endRange
}

returnedSession
}

override suspend fun finishBlobUpload(session: UploadSession, digest: Digest): Digest =
api.finishBlobUpload(session, digest)
.getOrElse { throw it.toRegistryClientError(null, digest) }
Expand All @@ -166,8 +192,8 @@ internal class SuspendingContainerImageRegistryClientImpl(private val api: Conta
api.uploadManifest(repository, reference, manifest)
.getOrElse { throw it.toRegistryClientError(repository, reference) }

override suspend fun upload(repository: Repository, reference: Reference, tar: Source): Digest =
uploader.upload(repository, reference, tar)
override suspend fun upload(repository: Repository, reference: Reference, tar: Source, mode: UploadMode): Digest =
uploader.upload(repository, reference, tar, mode)

override suspend fun download(repository: Repository, reference: Reference): Source =
downloader.download(repository, reference)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package de.cmdjulian.kirc.impl.delegate

import de.cmdjulian.kirc.KircUploadException
import de.cmdjulian.kirc.client.SuspendingContainerImageRegistryClient
import de.cmdjulian.kirc.client.UploadMode
import de.cmdjulian.kirc.image.Digest
import de.cmdjulian.kirc.image.Reference
import de.cmdjulian.kirc.image.Repository
Expand Down Expand Up @@ -36,7 +37,12 @@ import kotlin.io.path.pathString

internal class ImageUploader(private val client: SuspendingContainerImageRegistryClient, private val tmpPath: Path) {

suspend fun upload(repository: Repository, reference: Reference, tar: Source): Digest = coroutineScope {
suspend fun upload(
repository: Repository,
reference: Reference,
tar: Source,
upload: UploadMode = UploadMode.Stream,
): Digest = coroutineScope {
// store data temporarily
val tempDirectory = Path.of(
tmpPath.pathString,
Expand All @@ -62,9 +68,20 @@ internal class ImageUploader(private val client: SuspendingContainerImageRegistr
if (!client.existsBlob(repository, blob.digest)) {
val session = client.initiateBlobUpload(repository)

val endSession = client.uploadBlobChunks(session, blob.path)
when (upload) {
is UploadMode.Chunked -> {
val endSession =
client.uploadBlobChunks(session, blob.path, upload.chunkSize)
client.finishBlobUpload(endSession, blob.digest)
}

client.finishBlobUpload(endSession, blob.digest)
is UploadMode.Compatibility -> {
val endSession = client.uploadBlobCompatibility(session, blob.path)
client.finishBlobUpload(endSession, blob.digest)
}

is UploadMode.Stream -> client.uploadBlobStream(session, blob)
}
}
}
}
Expand Down
Loading