Skip to content

Commit 8b1e1d2

Browse files
committed
feat: migrate from webflux to ktor
Replaced the entire spring webflux/reactor pipeline with a ktor based http clients. Spring Boot remains only for dependency injection and configuration.
1 parent c649fc4 commit 8b1e1d2

16 files changed

+287
-272
lines changed

build.gradle.kts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,15 @@ repositories {
2626

2727
dependencies {
2828
// Dependencies
29-
implementation("org.springframework.boot:spring-boot-starter-webflux")
29+
implementation("org.springframework.boot:spring-boot-starter-json")
3030
implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
3131
implementation("org.jetbrains.kotlin:kotlin-reflect")
3232
implementation("nl.basjes.parse.useragent:yauaa:7.32.0")
3333
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core")
34-
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor")
34+
implementation("io.ktor:ktor-client-core:3.3.2")
35+
implementation("io.ktor:ktor-client-cio:3.3.2")
36+
implementation("io.ktor:ktor-client-content-negotiation:3.3.2")
37+
implementation("io.ktor:ktor-serialization-jackson:3.3.2")
3538

3639
// Test Dependencies
3740
testImplementation("io.kotest:kotest-runner-junit5:5.9.1")

src/main/kotlin/ch/srgssr/pillarbox/monitoring/DataTransferApplicationRunner.kt

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,13 @@ package ch.srgssr.pillarbox.monitoring
22

33
import ch.srgssr.pillarbox.monitoring.event.EventDispatcherClient
44
import ch.srgssr.pillarbox.monitoring.event.setup.OpenSearchSetupService
5+
import ch.srgssr.pillarbox.monitoring.exception.HttpClientException
56
import ch.srgssr.pillarbox.monitoring.log.logger
67
import kotlinx.coroutines.runBlocking
78
import org.springframework.boot.ApplicationArguments
89
import org.springframework.boot.ApplicationRunner
910
import org.springframework.context.annotation.Profile
1011
import org.springframework.stereotype.Component
11-
import org.springframework.web.reactive.function.client.WebClientResponseException
1212

1313
/**
1414
* DataTransferApplicationRunner is responsible for initializing the OpenSearch setup and
@@ -48,13 +48,8 @@ class DataTransferApplicationRunner(
4848
openSearchSetupService.start()
4949
logger.info("All setup tasks are completed, starting SSE client...")
5050
eventDispatcherClient.start().join()
51-
} catch (e: WebClientResponseException) {
52-
logger.error(
53-
"OpenSearch connection failed " +
54-
"| [Status Code: ${e.statusCode.value()}] " +
55-
"| [Body: ${e.responseBodyAsString}]",
56-
e,
57-
)
51+
} catch (e: HttpClientException) {
52+
logger.error("OpenSearch connection failed", e)
5853
} catch (e: Exception) {
5954
logger.error("OpenSearch setup failed due to an unexpected error", e)
6055
} finally {

src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/EventDispatcherClient.kt

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import ch.srgssr.pillarbox.monitoring.benchmark.timed
55
import ch.srgssr.pillarbox.monitoring.cache.LRUCache
66
import ch.srgssr.pillarbox.monitoring.event.model.EventRequest
77
import ch.srgssr.pillarbox.monitoring.event.repository.EventRepository
8+
import ch.srgssr.pillarbox.monitoring.exception.HttpClientException
89
import ch.srgssr.pillarbox.monitoring.flow.chunked
910
import ch.srgssr.pillarbox.monitoring.log.info
1011
import ch.srgssr.pillarbox.monitoring.log.logger
@@ -18,7 +19,6 @@ import kotlinx.coroutines.flow.launchIn
1819
import kotlinx.coroutines.flow.map
1920
import kotlinx.coroutines.flow.onEach
2021
import org.springframework.stereotype.Service
21-
import org.springframework.web.reactive.function.client.WebClientResponseException
2222

2323
/**
2424
* Service responsible for consuming events from a remote event dispatcher service via Server-Sent Events (SSE),
@@ -98,13 +98,8 @@ class EventDispatcherClient(
9898
timed("EventRepository.saveEvents") {
9999
eventRepository.saveAll(events)
100100
}
101-
} catch (e: WebClientResponseException) {
102-
logger.error(
103-
"A connection error occurred while saving the current batch " +
104-
"| [Status Code: ${e.statusCode.value()}] " +
105-
"| [Body: ${e.responseBodyAsString}]",
106-
e,
107-
)
101+
} catch (e: HttpClientException) {
102+
logger.error("An connection error occurred while saving the current batch", e)
108103
} catch (e: Exception) {
109104
logger.error("An unexpected error occurred while saving the current batch", e)
110105
}
Lines changed: 47 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,21 @@
11
package ch.srgssr.pillarbox.monitoring.event
22

33
import ch.srgssr.pillarbox.monitoring.event.model.EventRequest
4+
import ch.srgssr.pillarbox.monitoring.log.error
45
import ch.srgssr.pillarbox.monitoring.log.logger
6+
import ch.srgssr.pillarbox.monitoring.log.warn
7+
import com.fasterxml.jackson.databind.ObjectMapper
8+
import io.ktor.client.HttpClient
9+
import io.ktor.client.engine.cio.CIO
10+
import io.ktor.client.plugins.defaultRequest
11+
import io.ktor.client.plugins.sse.sse
12+
import io.ktor.http.ContentType
13+
import io.ktor.http.contentType
14+
import kotlinx.coroutines.channels.awaitClose
515
import kotlinx.coroutines.flow.Flow
16+
import kotlinx.coroutines.flow.callbackFlow
617
import kotlinx.coroutines.flow.retryWhen
718
import org.springframework.stereotype.Component
8-
import org.springframework.web.reactive.function.client.WebClient
9-
import org.springframework.web.reactive.function.client.bodyToFlow
1019

1120
/**
1221
* Provides a reactive [Flow] of [EventRequest]s by connecting to a remote Server-Sent Events (SSE) endpoint.
@@ -17,12 +26,12 @@ import org.springframework.web.reactive.function.client.bodyToFlow
1726
* - Applying retry logic in case of transient failures, with support for logging retry attempts.
1827
*
1928
* @property properties The SSE client configuration including URI and retry strategy.
20-
* @property webClientBuilder A builder for constructing the [WebClient] used to make SSE requests.
29+
* @property objectMapper
2130
*/
2231
@Component
2332
class EventFlowProvider(
2433
private val properties: EventDispatcherClientConfiguration,
25-
webClientBuilder: WebClient.Builder,
34+
private val objectMapper: ObjectMapper,
2635
) {
2736
private companion object {
2837
/**
@@ -31,26 +40,45 @@ class EventFlowProvider(
3140
private val logger = logger()
3241
}
3342

34-
private val webClient = webClientBuilder.baseUrl(properties.uri.toString()).build()
43+
private val httpClient =
44+
HttpClient(CIO) {
45+
defaultRequest {
46+
url(properties.uri.toString())
47+
contentType(ContentType.Application.Json)
48+
}
49+
}
3550

3651
/**
3752
* Creates and returns a [Flow] of [EventRequest]s from the SSE endpoint.
3853
*
3954
* @return A [Flow] that emits [EventRequest]s received from the remote SSE endpoint.
4055
*/
56+
@Suppress("TooGenericExceptionCaught")
4157
fun start(): Flow<EventRequest> =
42-
webClient
43-
.get()
44-
.retrieve()
45-
.bodyToFlow<EventRequest>()
46-
.retryWhen(
47-
properties.sseRetry.toRetryWhen(
48-
onRetry = { cause, attempt, delayMillis ->
49-
logger.warn(
50-
"Retrying after failure: ${cause.message}. " +
51-
"Attempt ${attempt + 1}. Waiting for ${delayMillis}ms",
52-
)
53-
},
54-
),
55-
)
58+
callbackFlow {
59+
try {
60+
httpClient.sse("/") {
61+
incoming.collect {
62+
val event = objectMapper.readValue(it.data, EventRequest::class.java)
63+
trySend(event).isSuccess
64+
}
65+
}
66+
} catch (e: Exception) {
67+
logger.error(e) { "SSE connection failed: ${e.message}" }
68+
close(e)
69+
}
70+
71+
awaitClose {
72+
logger.info("SSE flow closed")
73+
httpClient.close()
74+
}
75+
}.retryWhen(
76+
properties.sseRetry.toRetryWhen(
77+
onRetry = { cause, attempt, delayMillis ->
78+
logger.warn(cause) {
79+
"Retrying after failure: ${cause.message}. Attempt ${attempt + 1}. Waiting for ${delayMillis}ms"
80+
}
81+
},
82+
),
83+
)
5684
}

src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/config/RetryProperties.kt

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@ package ch.srgssr.pillarbox.monitoring.event.config
22

33
import kotlinx.coroutines.delay
44
import kotlinx.coroutines.flow.FlowCollector
5-
import reactor.util.retry.Retry
6-
import reactor.util.retry.RetryBackoffSpec
75
import java.time.Duration
86

97
/**
@@ -18,18 +16,6 @@ data class RetryProperties(
1816
val initialInterval: Duration = Duration.ofSeconds(5),
1917
val maxInterval: Duration = Duration.ofMinutes(1),
2018
) {
21-
/**
22-
* Creates a [RetryBackoffSpec] based on the retry properties.
23-
* This specification defines the backoff strategy for retries,
24-
* including the number of attempts and interval timings.
25-
*
26-
* @return A configured [RetryBackoffSpec] for use with Reactor's retry mechanism.
27-
*/
28-
fun create(): RetryBackoffSpec =
29-
Retry
30-
.backoff(maxAttempts, initialInterval)
31-
.maxBackoff(maxInterval)
32-
3319
/**
3420
* Creates a `retryWhen` callback function for Kotlin Flow.
3521
*

src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/repository/EventRepository.kt

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,30 @@
11
package ch.srgssr.pillarbox.monitoring.event.repository
22

33
import ch.srgssr.pillarbox.monitoring.event.model.EventRequest
4+
import ch.srgssr.pillarbox.monitoring.io.onSuccess
5+
import ch.srgssr.pillarbox.monitoring.io.throwOnNotSuccess
46
import ch.srgssr.pillarbox.monitoring.log.debug
57
import ch.srgssr.pillarbox.monitoring.log.error
68
import ch.srgssr.pillarbox.monitoring.log.logger
79
import com.fasterxml.jackson.databind.ObjectMapper
8-
import kotlinx.coroutines.reactor.awaitSingleOrNull
10+
import io.ktor.client.HttpClient
11+
import io.ktor.client.call.body
12+
import io.ktor.client.request.post
13+
import io.ktor.client.request.setBody
14+
import io.ktor.client.request.url
915
import org.springframework.beans.factory.annotation.Qualifier
10-
import org.springframework.http.HttpHeaders
11-
import org.springframework.http.MediaType
1216
import org.springframework.stereotype.Component
13-
import org.springframework.web.reactive.function.client.WebClient
1417

1518
/**
1619
* Repository responsible for sending event data to OpenSearch using the bulk API.
1720
*
18-
* @property webClient The [WebClient] configured to connect to the OpenSearch instance.
21+
* @property httpClient The [HttpClient] configured to connect to the OpenSearch instance.
1922
* @property objectMapper Jackson's [ObjectMapper] used to serialize event objects.
2023
*/
2124
@Component
2225
class EventRepository(
23-
@param:Qualifier("openSearchWebClient")
24-
private val webClient: WebClient,
26+
@param:Qualifier("openSearchHttpClient")
27+
private val httpClient: HttpClient,
2528
private val objectMapper: ObjectMapper,
2629
) {
2730
private companion object {
@@ -43,15 +46,12 @@ class EventRepository(
4346
* @param events A list of events to be indexed.
4447
*/
4548
suspend fun saveAll(events: List<EventRequest>) {
46-
webClient
47-
.post()
48-
.uri("/_bulk")
49-
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
50-
.bodyValue(toNDJson(events))
51-
.retrieve()
52-
.bodyToMono(String::class.java)
53-
.awaitSingleOrNull()
54-
?.let { responseStr ->
49+
httpClient
50+
.post {
51+
url("/_bulk")
52+
setBody(toNDJson(events))
53+
}.onSuccess {
54+
val responseStr = body<String>()
5555
logger.debug { "Bulk response: $responseStr " }
5656

5757
val response = objectMapper.readTree(responseStr)
@@ -67,7 +67,7 @@ class EventRepository(
6767
}
6868
}
6969
}
70-
}
70+
}.throwOnNotSuccess { "Connection error" }
7171
}
7272

7373
/**

src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/repository/OpenSearchConfigurationProperties.kt

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package ch.srgssr.pillarbox.monitoring.event.repository
33
import ch.srgssr.pillarbox.monitoring.event.config.RetryProperties
44
import org.springframework.boot.context.properties.ConfigurationProperties
55
import org.springframework.boot.context.properties.NestedConfigurationProperty
6-
import org.springframework.util.unit.DataSize
76
import java.net.URI
87

98
/**
@@ -14,14 +13,11 @@ import java.net.URI
1413
* @property uri The URI of the OpenSearch server. Defaults to `http://localhost:9200`.
1514
* @property retry Nested configuration properties for retry settings related to OpenSearch operations.
1615
* @property timeout The default timeout for each connection in milliseconds. 10s by default.
17-
* @property maxInMemorySize The maximum size in bytes allowed for buffering response bodies in memory,
18-
* 64 MB by default.
1916
*/
2017
@ConfigurationProperties(prefix = "pillarbox.monitoring.opensearch")
2118
data class OpenSearchConfigurationProperties(
2219
val uri: URI = URI("http://localhost:9200"),
2320
@NestedConfigurationProperty
2421
val retry: RetryProperties = RetryProperties(),
2522
val timeout: Int = 10_000,
26-
val maxInMemorySize: DataSize = DataSize.ofMegabytes(64),
2723
)
Lines changed: 26 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,37 @@
11
package ch.srgssr.pillarbox.monitoring.event.setup
22

3+
import ch.srgssr.pillarbox.monitoring.io.is4xxClientError
34
import ch.srgssr.pillarbox.monitoring.io.loadResourceContent
4-
import ch.srgssr.pillarbox.monitoring.log.error
5+
import ch.srgssr.pillarbox.monitoring.io.onStatus
6+
import ch.srgssr.pillarbox.monitoring.io.onSuccess
7+
import ch.srgssr.pillarbox.monitoring.io.throwOnNotSuccess
58
import ch.srgssr.pillarbox.monitoring.log.info
69
import ch.srgssr.pillarbox.monitoring.log.logger
7-
import kotlinx.coroutines.reactor.awaitSingleOrNull
10+
import io.ktor.client.HttpClient
11+
import io.ktor.client.request.get
12+
import io.ktor.client.request.post
13+
import io.ktor.client.request.setBody
14+
import io.ktor.client.request.url
15+
import io.ktor.http.HttpStatusCode
816
import org.springframework.beans.factory.annotation.Qualifier
917
import org.springframework.core.annotation.Order
1018
import org.springframework.core.io.support.ResourcePatternResolver
11-
import org.springframework.http.HttpHeaders
12-
import org.springframework.http.HttpStatusCode
13-
import org.springframework.http.MediaType
1419
import org.springframework.stereotype.Component
15-
import org.springframework.web.reactive.function.client.WebClient
16-
import reactor.core.publisher.Mono
1720

1821
/**
1922
* Task responsible for setting up a filtered alias in OpenSearch.
2023
*
2124
* This task checks if the specified alias exists in OpenSearch. If it does not, it loads the alias
2225
* configuration from `resources/opensearch/user_events-alias.json` and creates it.
2326
*
24-
* @property webClient WebClient instance used to interact with the OpenSearch API.
27+
* @property httpClient [HttpClient] instance used to interact with the OpenSearch API.
2528
* @property resourceLoader Resource loader used to access the alias configuration JSON file.
2629
*/
2730
@Component
2831
@Order(4)
2932
class AliasSetupTask(
30-
@param:Qualifier("openSearchWebClient")
31-
private val webClient: WebClient,
33+
@param:Qualifier("openSearchHttpClient")
34+
private val httpClient: HttpClient,
3235
private val resourceLoader: ResourcePatternResolver,
3336
) : OpenSearchSetupTask {
3437
private companion object {
@@ -52,33 +55,23 @@ class AliasSetupTask(
5255
for (resource in resources) {
5356
val filename = resource.filename ?: continue
5457
val aliasName = filename.removeSuffix("-alias.json")
55-
checkAndCreateAlias(aliasName).awaitSingleOrNull()
58+
checkAndCreateAlias(aliasName)
5659
}
5760
}
5861

59-
private fun checkAndCreateAlias(aliasName: String): Mono<*> =
60-
webClient
61-
.get()
62-
.uri("/_alias/$aliasName")
63-
.retrieve()
62+
private suspend fun checkAndCreateAlias(aliasName: String) =
63+
httpClient
64+
.get("/_alias/$aliasName")
6465
.onStatus(HttpStatusCode::is4xxClientError) {
65-
logger.info { "Alias '$aliasName' does not exist, creating alias..." }
66-
createAlias(aliasName).then(Mono.empty())
67-
}.onStatus(HttpStatusCode::is2xxSuccessful) {
6866
logger.info { "Alias '$aliasName' already exists, skipping creation." }
69-
Mono.empty()
70-
}.toBodilessEntity()
67+
createAlias(aliasName)
68+
}.onSuccess { logger.info { "Alias '$aliasName' does not exist, creating alias..." } }
7169

72-
private fun createAlias(aliasName: String): Mono<*> {
73-
val indexTemplateJson = resourceLoader.loadResourceContent("classpath:opensearch/$aliasName-alias.json")
74-
return webClient
75-
.post()
76-
.uri("/_aliases")
77-
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
78-
.bodyValue(indexTemplateJson)
79-
.retrieve()
80-
.toBodilessEntity()
81-
.doOnSuccess { logger.info { "Alias '$aliasName' created successfully" } }
82-
.doOnError { e -> logger.error { "Failed to create alias '$aliasName': ${e.message}" } }
83-
}
70+
private suspend fun createAlias(aliasName: String) =
71+
httpClient
72+
.post {
73+
url("/_aliases")
74+
setBody(resourceLoader.loadResourceContent("classpath:opensearch/$aliasName-alias.json"))
75+
}.onSuccess { logger.info { "Alias '$aliasName' created successfully" } }
76+
.throwOnNotSuccess { "Failed to create alias '$aliasName'" }
8477
}

0 commit comments

Comments
 (0)