Skip to content

Commit 00f7f50

Browse files
committed
feat: support s3 urls for input and output
This commit add support for using s3 urls on the format s3://<BUCKET>/<KEY> in both input and output. If ans s3 URL is used as input, a presigned URL is created and used as input to ffmpeg. The duration of the presigned URLs can be controlled with the 'remote-files.s3.presignDurationSeconds' config property. If an s3 URL is used for 'outputFolder', output will first be stored locally and then uploaded to s3 once transcoding is finished. Aws credentials are read with DefaultCredentialsProvider, meaning aws credentials can be provided in a number of ways, see https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/auth/credentials/DefaultCredentialsProvider.html; Not that when using s3 urls for input, the presigned URLs will be shown in the logs. If this is not desirable, setting logging.config (or env variable LOGGING_CONFIG) to 'classpath:logback-json-mask-s3-presign.xml' will use a log config that masks the presign query parameters. By setting env variable REMOTEFILES_S3_ANONYMOUSACCESS to true, s3 urls will be accessed in anonymous mode, corresponding to using the '--no-sign-request' flag with the aws cli. Any s3 access key or secrets key configured will be ignored. Multipart upload will be disabled in this case since the s3 sdk does not support multipart upload when using anonymous access. S3 multipart upload can be configured through configuration properties: - remote-files.s3.multipart.minimumPartSize - remote-files.s3.multipartthreshold - remote-files.s3.multipart.apicallbuffersize These properties corresponds to the properties in software.amazon.awssdk.services.s3.multipart.MultipartConfiguration See https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/s3/multipart/MultipartConfiguration.html Signed-off-by: Gustav Grusell <[email protected]>
1 parent 58230f8 commit 00f7f50

File tree

21 files changed

+722
-40
lines changed

21 files changed

+722
-40
lines changed

checks.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ jacocoTestCoverageVerification {
1111
'*.static {...}',
1212
'*.model.*.get*',
1313
'*.service.localencode.LocalEncodeService.moveFile*',
14+
'*.S3Properties*.get*()',
15+
'*RemoteFileService.DefaultHandler.*',
1416
'*QueueService.getQueue*',
1517
'*QueueService.migrateQueues()',
1618
'*.ShutdownHandler.*',

encore-common/build.gradle.kts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@ dependencies {
1616

1717
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core")
1818
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-slf4j")
19+
implementation(platform("software.amazon.awssdk:bom:2.33.9"))
20+
implementation("software.amazon.awssdk:s3")
21+
implementation("software.amazon.awssdk:s3-transfer-manager")
1922

2023
testImplementation(project(":encore-web"))
2124
testImplementation("org.springframework.security:spring-security-test")
@@ -26,6 +29,7 @@ dependencies {
2629
testFixturesImplementation("com.redis:testcontainers-redis:2.2.4")
2730
testFixturesImplementation("io.github.oshai:kotlin-logging-jvm:7.0.13")
2831
testFixturesImplementation("org.junit.jupiter:junit-jupiter-api")
32+
testFixturesImplementation("org.testcontainers:localstack:1.20.3")
2933
testFixturesRuntimeOnly("org.junit.platform:junit-platform-launcher")
3034
}
3135

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
// SPDX-FileCopyrightText: 2024 Eyevinn Technology AB
2+
//
3+
// SPDX-License-Identifier: EUPL-1.2
4+
5+
package se.svt.oss.encore
6+
7+
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
8+
import org.springframework.boot.context.properties.EnableConfigurationProperties
9+
import org.springframework.context.annotation.Bean
10+
import org.springframework.context.annotation.Configuration
11+
import se.svt.oss.encore.service.remotefiles.s3.S3Properties
12+
import se.svt.oss.encore.service.remotefiles.s3.S3RemoteFileHandler
13+
import se.svt.oss.encore.service.remotefiles.s3.S3UriConverter
14+
import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider
15+
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider
16+
import software.amazon.awssdk.regions.Region
17+
import software.amazon.awssdk.services.s3.S3AsyncClient
18+
import software.amazon.awssdk.services.s3.S3Configuration
19+
import software.amazon.awssdk.services.s3.presigner.S3Presigner
20+
import software.amazon.awssdk.transfer.s3.S3TransferManager
21+
import java.net.URI
22+
23+
@ConditionalOnProperty("remote-files.s3.enabled", havingValue = "true")
24+
@EnableConfigurationProperties(S3Properties::class)
25+
@Configuration
26+
class S3RemoteFilesConfiguration {
27+
28+
@Bean
29+
fun s3Region() =
30+
Region.of(System.getProperty("aws.region") ?: System.getenv("AWS_REGION") ?: "us-east-1")
31+
32+
@Bean
33+
fun transferManager(s3Client: S3AsyncClient, s3Properties: S3Properties): S3TransferManager =
34+
S3TransferManager.builder()
35+
.s3Client(s3Client)
36+
.build()
37+
38+
@Bean
39+
fun s3Client(s3Region: Region, s3Properties: S3Properties) = S3AsyncClient.builder()
40+
.region(s3Region)
41+
.crossRegionAccessEnabled(true)
42+
.multipartConfiguration { multipartConfigurationBuilder ->
43+
multipartConfigurationBuilder
44+
.minimumPartSizeInBytes(s3Properties.multipart.minimumPartSize)
45+
.thresholdInBytes(s3Properties.multipart.threshold)
46+
.apiCallBufferSizeInBytes(s3Properties.multipart.apiCallBufferSize)
47+
}
48+
.multipartEnabled(!s3Properties.anonymousAccess) // Multipart upload requires credentials
49+
.serviceConfiguration(
50+
S3Configuration.builder()
51+
.pathStyleAccessEnabled(s3Properties.usePathStyle)
52+
.build(),
53+
)
54+
.credentialsProvider(
55+
if (s3Properties.anonymousAccess) {
56+
AnonymousCredentialsProvider.create()
57+
} else {
58+
DefaultCredentialsProvider.builder().build()
59+
},
60+
)
61+
.apply {
62+
if (s3Properties.endpoint.isNotBlank()) {
63+
endpointOverride(URI.create(s3Properties.endpoint))
64+
}
65+
}
66+
.build()
67+
68+
@Bean
69+
fun s3Presigner(s3Region: Region, s3Properties: S3Properties) = S3Presigner.builder()
70+
.region(s3Region)
71+
.serviceConfiguration(
72+
S3Configuration.builder()
73+
.pathStyleAccessEnabled(true)
74+
.build(),
75+
)
76+
.apply {
77+
if (!s3Properties.endpoint.isNullOrBlank()) {
78+
endpointOverride(URI.create(s3Properties.endpoint))
79+
}
80+
}
81+
.build()
82+
83+
@Bean
84+
fun s3UriConverter(s3Properties: S3Properties, s3Region: Region) = S3UriConverter(s3Properties, s3Region)
85+
86+
@Bean
87+
fun s3RemoteFileHandler(
88+
s3Client: S3AsyncClient,
89+
s3Presigner: S3Presigner,
90+
s3Properties: S3Properties,
91+
s3UriConverter: S3UriConverter,
92+
transferManager: S3TransferManager,
93+
) =
94+
S3RemoteFileHandler(s3Client, s3Presigner, s3Properties, s3UriConverter, transferManager)
95+
}

encore-common/src/main/kotlin/se/svt/oss/encore/model/input/Input.kt

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ sealed interface Input {
3535
@get:Schema(description = "URI of input file", required = true, example = "/path/to/file.mp4")
3636
val uri: String
3737

38+
var accessUri: String
39+
3840
@get:Schema(description = "Input params required to properly decode input", example = """{ "ac": "2" }""")
3941
val params: LinkedHashMap<String, String?>
4042

@@ -167,6 +169,9 @@ data class AudioInput(
167169
override val type: String
168170
get() = TYPE_AUDIO
169171

172+
@JsonIgnore
173+
override var accessUri: String = uri
174+
170175
override fun withSeekTo(seekTo: Double) = copy(seekTo = seekTo)
171176

172177
val duration: Double
@@ -188,6 +193,9 @@ data class VideoInput(
188193
override val seekTo: Double? = null,
189194
override val copyTs: Boolean = false,
190195
) : VideoIn {
196+
@JsonIgnore
197+
override var accessUri: String = uri
198+
191199
override val analyzedVideo: VideoFile
192200
@JsonIgnore
193201
get() = analyzed as? VideoFile ?: throw RuntimeException("Analyzed video for $uri is ${analyzed?.type}")
@@ -221,6 +229,9 @@ data class AudioVideoInput(
221229
override val copyTs: Boolean = false,
222230
) : VideoIn,
223231
AudioIn {
232+
@JsonIgnore
233+
override var accessUri: String = uri
234+
224235
override val analyzedVideo: VideoFile
225236
@JsonIgnore
226237
get() = analyzed as? VideoFile ?: throw RuntimeException("Analyzed audio/video for $uri is ${analyzed?.type}")
@@ -245,7 +256,7 @@ fun List<Input>.inputParams(readDuration: Double?): List<String> =
245256
(readDuration?.let { listOf("-t", "$it") } ?: emptyList()) +
246257
(input.seekTo?.let { listOf("-ss", "$it") } ?: emptyList()) +
247258
(if (input.copyTs) listOf("-copyts") else emptyList()) +
248-
listOf("-i", input.uri)
259+
listOf("-i", input.accessUri)
249260
}
250261

251262
fun List<Input>.maxDuration(): Double? = maxOfOrNull {

encore-common/src/main/kotlin/se/svt/oss/encore/service/EncoreService.kt

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ import se.svt.oss.encore.service.callback.CallbackService
4545
import se.svt.oss.encore.service.localencode.LocalEncodeService
4646
import se.svt.oss.encore.service.mediaanalyzer.MediaAnalyzerService
4747
import se.svt.oss.encore.service.queue.QueueService
48+
import se.svt.oss.encore.service.remotefiles.RemoteFileService
4849
import se.svt.oss.mediaanalyzer.file.MediaContainer
4950
import se.svt.oss.mediaanalyzer.file.MediaFile
5051
import java.io.File
@@ -67,6 +68,7 @@ class EncoreService(
6768
private val localEncodeService: LocalEncodeService,
6869
private val encoreProperties: EncoreProperties,
6970
private val queueService: QueueService,
71+
private val remoteFileService: RemoteFileService,
7072
) {
7173

7274
private val cancelTopicName = "cancel"
@@ -227,7 +229,7 @@ class EncoreService(
227229
callbackService.sendProgressCallback(encoreJob)
228230
} finally {
229231
redisMessageListerenerContainer.removeMessageListener(cancelListener)
230-
localEncodeService.cleanup(outputFolder)
232+
localEncodeService.cleanup(outputFolder, encoreJob)
231233
}
232234
}
233235

@@ -270,6 +272,10 @@ class EncoreService(
270272
}
271273

272274
private fun initJob(encoreJob: EncoreJob) {
275+
encoreJob.inputs.forEach { input ->
276+
input.accessUri = remoteFileService.getAccessUri(input.uri)
277+
}
278+
273279
encoreJob.inputs.forEach { input ->
274280
mediaAnalyzerService.analyzeInput(input)
275281
}

encore-common/src/main/kotlin/se/svt/oss/encore/service/localencode/LocalEncodeService.kt

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,13 @@ import org.springframework.stereotype.Service
99
import se.svt.oss.encore.config.EncoreProperties
1010
import se.svt.oss.encore.model.EncoreJob
1111
import se.svt.oss.encore.process.createTempDir
12+
import se.svt.oss.encore.service.remotefiles.RemoteFileService
1213
import se.svt.oss.mediaanalyzer.file.AudioFile
1314
import se.svt.oss.mediaanalyzer.file.ImageFile
1415
import se.svt.oss.mediaanalyzer.file.MediaFile
1516
import se.svt.oss.mediaanalyzer.file.VideoFile
1617
import java.io.File
18+
import java.net.URI
1719
import java.nio.file.Files
1820
import java.nio.file.Path
1921
import java.nio.file.StandardCopyOption
@@ -23,11 +25,12 @@ private val log = KotlinLogging.logger {}
2325
@Service
2426
class LocalEncodeService(
2527
private val encoreProperties: EncoreProperties,
28+
private val remoteFileService: RemoteFileService,
2629
) {
2730

2831
fun outputFolder(
2932
encoreJob: EncoreJob,
30-
): String = if (encoreProperties.localTemporaryEncode) {
33+
): String = if (encoreProperties.localTemporaryEncode || remoteFileService.isRemoteFile(encoreJob.outputFolder)) {
3134
createTempDir("job_${encoreJob.id}").toString()
3235
} else {
3336
encoreJob.outputFolder
@@ -38,6 +41,23 @@ class LocalEncodeService(
3841
output: List<MediaFile>,
3942
encoreJob: EncoreJob,
4043
): List<MediaFile> {
44+
if (remoteFileService.isRemoteFile(encoreJob.outputFolder)) {
45+
log.debug { "Moving files to output destination ${encoreJob.outputFolder}, from local temp $outputFolder" }
46+
File(outputFolder).listFiles()?.forEach { localFile ->
47+
val remoteFile = URI.create(encoreJob.outputFolder).resolve(localFile.name).toString()
48+
remoteFileService.upload(localFile.toString(), remoteFile)
49+
}
50+
val files = output.map {
51+
val resolvedPath = URI.create(encoreJob.outputFolder).resolve(Path.of(it.file).fileName.toString()).toString()
52+
when (it) {
53+
is VideoFile -> it.copy(file = resolvedPath)
54+
is AudioFile -> it.copy(file = resolvedPath)
55+
is ImageFile -> it.copy(file = resolvedPath)
56+
else -> throw Exception("Invalid conversion")
57+
}
58+
}
59+
return files
60+
}
4161
if (encoreProperties.localTemporaryEncode) {
4262
val destination = File(encoreJob.outputFolder)
4363
log.debug { "Moving files to correct outputFolder ${encoreJob.outputFolder}, from local temp $outputFolder" }
@@ -50,8 +70,10 @@ class LocalEncodeService(
5070
return output
5171
}
5272

53-
fun cleanup(tempDirectory: String?) {
54-
if (tempDirectory != null && encoreProperties.localTemporaryEncode) {
73+
fun cleanup(tempDirectory: String?, encoreJob: EncoreJob) {
74+
if (tempDirectory != null &&
75+
(encoreProperties.localTemporaryEncode || remoteFileService.isRemoteFile(encoreJob.outputFolder))
76+
) {
5577
File(tempDirectory).deleteRecursively()
5678
}
5779
}

encore-common/src/main/kotlin/se/svt/oss/encore/service/mediaanalyzer/MediaAnalyzerService.kt

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import se.svt.oss.mediaanalyzer.ffprobe.SideData
2222
import se.svt.oss.mediaanalyzer.ffprobe.UnknownSideData
2323
import se.svt.oss.mediaanalyzer.ffprobe.UnknownStream
2424
import se.svt.oss.mediaanalyzer.file.AudioFile
25+
import se.svt.oss.mediaanalyzer.file.ImageFile
26+
import se.svt.oss.mediaanalyzer.file.SubtitleFile
2527
import se.svt.oss.mediaanalyzer.file.VideoFile
2628
import se.svt.oss.mediaanalyzer.mediainfo.AudioTrack
2729
import se.svt.oss.mediaanalyzer.mediainfo.GeneralTrack
@@ -58,20 +60,25 @@ class MediaAnalyzerService(private val mediaAnalyzer: MediaAnalyzer) {
5860
val useFirstAudioStreams = (input as? AudioIn)?.channelLayout?.channels?.size
5961

6062
input.analyzed = mediaAnalyzer.analyze(
61-
file = input.uri,
63+
file = input.accessUri,
6264
probeInterlaced = probeInterlaced,
6365
ffprobeInputParams = input.params,
64-
).let {
65-
val selectedVideoStream = (input as? VideoIn)?.videoStream
66-
val selectedAudioStream = (input as? AudioIn)?.audioStream
67-
when (it) {
68-
is VideoFile -> it.selectVideoStream(selectedVideoStream)
69-
.selectAudioStream(selectedAudioStream)
70-
.trimAudio(useFirstAudioStreams)
71-
is AudioFile -> it.selectAudioStream(selectedAudioStream)
72-
.trimAudio(useFirstAudioStreams)
73-
else -> it
66+
)
67+
.let {
68+
val selectedVideoStream = (input as? VideoIn)?.videoStream
69+
val selectedAudioStream = (input as? AudioIn)?.audioStream
70+
when (it) {
71+
is VideoFile -> it.selectVideoStream(selectedVideoStream)
72+
.selectAudioStream(selectedAudioStream)
73+
.trimAudio(useFirstAudioStreams)
74+
.copy(file = input.uri)
75+
is AudioFile -> it.selectAudioStream(selectedAudioStream)
76+
.trimAudio(useFirstAudioStreams)
77+
.copy(file = input.uri)
78+
is ImageFile -> it.copy(file = input.uri)
79+
is SubtitleFile -> it.copy(file = input.uri)
80+
else -> it
81+
}
7482
}
75-
}
7683
}
7784
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
// SPDX-FileCopyrightText: 2024 Eyevinn Technology AB
2+
//
3+
// SPDX-License-Identifier: EUPL-1.2
4+
5+
package se.svt.oss.encore.service.remotefiles
6+
7+
interface RemoteFileHandler {
8+
fun getAccessUri(uri: String): String
9+
fun upload(localFile: String, remoteFile: String)
10+
val protocols: List<String>
11+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
// SPDX-FileCopyrightText: 2024 Eyevinn Technology AB
2+
//
3+
// SPDX-License-Identifier: EUPL-1.2
4+
5+
package se.svt.oss.encore.service.remotefiles
6+
7+
import io.github.oshai.kotlinlogging.KotlinLogging
8+
import org.springframework.stereotype.Service
9+
import java.net.URI
10+
11+
private val log = KotlinLogging.logger {}
12+
13+
@Service
14+
class RemoteFileService(private val remoteFileHandlers: List<RemoteFileHandler>) {
15+
16+
private val defaultHandler = DefaultHandler()
17+
18+
fun isRemoteFile(uriOrPath: String): Boolean {
19+
val uri = URI.create(uriOrPath)
20+
return !(uri.scheme.isNullOrEmpty() || uri.scheme.lowercase() == "file")
21+
}
22+
23+
fun getAccessUri(uriOrPath: String): String {
24+
val uri = URI.create(uriOrPath)
25+
return getHandler(uri).getAccessUri(uriOrPath)
26+
}
27+
28+
fun upload(localFile: String, remoteFile: String) {
29+
val uri = URI.create(remoteFile)
30+
getHandler(uri).upload(localFile, remoteFile)
31+
}
32+
33+
private fun getHandler(uri: URI): RemoteFileHandler {
34+
log.info { "Getting handler for uri $uri. Available protocols: ${remoteFileHandlers.flatMap {it.protocols} }" }
35+
if (uri.scheme.isNullOrEmpty() || uri.scheme.lowercase() == "file") {
36+
return defaultHandler
37+
}
38+
val handler = remoteFileHandlers.firstOrNull { it.protocols.contains(uri.scheme) }
39+
if (handler != null) {
40+
return handler
41+
}
42+
log.info { "No remote file handler found for protocol ${uri.scheme}. Using default handler." }
43+
return defaultHandler
44+
}
45+
46+
/** Handler user for protocols where no specific handler is defined. Works for local files and
47+
* any protocols that ffmpeg supports natively */
48+
private class DefaultHandler : RemoteFileHandler {
49+
override fun getAccessUri(uri: String): String = uri
50+
51+
override fun upload(localFile: String, remoteFile: String) {
52+
// Do nothing
53+
}
54+
55+
override val protocols: List<String> = emptyList()
56+
}
57+
}

0 commit comments

Comments
 (0)