diff --git a/build.gradle.kts b/build.gradle.kts index d905918..1dd0eaf 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -2,14 +2,14 @@ import org.jlleitschuh.gradle.ktlint.reporter.ReporterType.PLAIN import java.util.Properties plugins { - kotlin("jvm") version "2.2.21" - kotlin("plugin.spring") version "2.2.21" - id("org.springframework.boot") version "3.5.7" - id("io.spring.dependency-management") version "1.1.7" - id("dev.detekt") version ("2.0.0-alpha.1") - id("org.jlleitschuh.gradle.ktlint") version "14.0.1" - id("org.jetbrains.kotlinx.kover") version "0.9.3" - id("com.github.ben-manes.versions") version "0.53.0" + alias(libs.plugins.kotlin.jvm) + alias(libs.plugins.kotlin.spring) + alias(libs.plugins.spring.boot) + alias(libs.plugins.spring.dependency.management) + alias(libs.plugins.detekt) + alias(libs.plugins.ktlint) + alias(libs.plugins.kover) + alias(libs.plugins.versions) } group = "ch.srgssr.pillarbox" @@ -26,22 +26,23 @@ repositories { dependencies { // Dependencies - implementation("org.springframework.boot:spring-boot-starter-webflux") - implementation("com.fasterxml.jackson.module:jackson-module-kotlin") - implementation("org.jetbrains.kotlin:kotlin-reflect") - implementation("nl.basjes.parse.useragent:yauaa:7.32.0") - implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core") - implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor") + implementation(libs.spring.boot.starter.json) + implementation(libs.jackson.module.kotlin) + implementation(libs.kotlin.reflect) + implementation(libs.yauaa) + implementation(libs.kotlin.coroutines.core) + implementation(libs.ktor.client.core) + implementation(libs.ktor.client.cio) // Test Dependencies - testImplementation("io.kotest:kotest-runner-junit5:5.9.1") - testImplementation("org.springframework.boot:spring-boot-starter-test") - testImplementation("io.kotest.extensions:kotest-extensions-spring:1.3.0") - testImplementation("org.jetbrains.kotlin:kotlin-test-junit5") - testImplementation("io.mockk:mockk:1.14.6") - testImplementation("com.squareup.okhttp3:mockwebserver:5.3.1") - testImplementation("com.squareup.okhttp3:okhttp:5.3.1") - testRuntimeOnly("org.junit.platform:junit-platform-launcher") + testImplementation(libs.kotest.runner.junit5) + testImplementation(libs.spring.boot.starter.test) + testImplementation(libs.kotest.extensions.spring) + testImplementation(libs.kotlin.test.junit5) + testImplementation(libs.mockk) + testImplementation(libs.mockwebserver) + testImplementation(libs.okhttp) + testRuntimeOnly(libs.junit.platform.launcher) } kotlin { @@ -51,14 +52,17 @@ kotlin { } detekt { - toolVersion = "2.0.0-alpha.1" + toolVersion = libs.versions.detekt.get() buildUponDefaultConfig = true allRules = false config.setFrom("$projectDir/detekt.yml") } ktlint { - version.set("1.8.0") + version.set( + libs.versions.ktlint.cli + .get(), + ) debug.set(false) android.set(false) outputToConsole.set(true) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml new file mode 100644 index 0000000..bf317a3 --- /dev/null +++ b/gradle/libs.versions.toml @@ -0,0 +1,48 @@ +[versions] +#Plugins +kotlin = "2.2.21" +spring-boot = "3.5.7" +spring-deps = "1.1.7" +detekt = "2.0.0-alpha.1" +ktlint = "14.0.1" +ktlint-cli = "1.8.0" +kover = "0.9.3" +versions-plugin = "0.53.0" + +# Libs +yauaa = "7.32.0" +ktor = "3.3.2" +kotest = "5.9.1" +kotest-spring = "1.3.0" +mockk = "1.14.6" +okhttp = "5.3.1" + +[libraries] + +spring-boot-starter-json = { module = "org.springframework.boot:spring-boot-starter-json" } +spring-boot-starter-test = { module = "org.springframework.boot:spring-boot-starter-test" } +kotlin-reflect = { module = "org.jetbrains.kotlin:kotlin-reflect" } +kotlin-coroutines-core = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-core"} +jackson-module-kotlin = { module = "com.fasterxml.jackson.module:jackson-module-kotlin" } +yauaa = { module = "nl.basjes.parse.useragent:yauaa", version.ref = "yauaa" } +ktor-client-core = { module = "io.ktor:ktor-client-core", version.ref = "ktor" } +ktor-client-cio = { module = "io.ktor:ktor-client-cio", version.ref = "ktor" } + +# Tests +kotest-runner-junit5 = { module = "io.kotest:kotest-runner-junit5", version.ref = "kotest" } +kotest-extensions-spring = { module = "io.kotest.extensions:kotest-extensions-spring", version.ref = "kotest-spring" } +kotlin-test-junit5 = { module = "org.jetbrains.kotlin:kotlin-test-junit5" } +mockk = { module = "io.mockk:mockk", version.ref = "mockk" } +okhttp = { module = "com.squareup.okhttp3:okhttp", version.ref = "okhttp" } +mockwebserver = { module = "com.squareup.okhttp3:mockwebserver", version.ref = "okhttp" } +junit-platform-launcher = { module = "org.junit.platform:junit-platform-launcher" } + +[plugins] +kotlin-jvm = { id = "org.jetbrains.kotlin.jvm", version.ref = "kotlin" } +kotlin-spring = { id = "org.jetbrains.kotlin.plugin.spring", version.ref = "kotlin" } +spring-boot = { id = "org.springframework.boot", version.ref = "spring-boot" } +spring-dependency-management = { id = "io.spring.dependency-management", version.ref = "spring-deps" } +detekt = { id = "dev.detekt", version.ref = "detekt" } +ktlint = { id = "org.jlleitschuh.gradle.ktlint", version.ref = "ktlint" } +kover = { id = "org.jetbrains.kotlinx.kover", version.ref = "kover" } +versions = { id = "com.github.ben-manes.versions", version.ref = "versions-plugin" } diff --git a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/DataTransferApplicationRunner.kt b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/DataTransferApplicationRunner.kt index 48cfb2f..8851372 100644 --- a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/DataTransferApplicationRunner.kt +++ b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/DataTransferApplicationRunner.kt @@ -2,13 +2,13 @@ package ch.srgssr.pillarbox.monitoring import ch.srgssr.pillarbox.monitoring.event.EventDispatcherClient import ch.srgssr.pillarbox.monitoring.event.setup.OpenSearchSetupService +import ch.srgssr.pillarbox.monitoring.exception.HttpClientException import ch.srgssr.pillarbox.monitoring.log.logger import kotlinx.coroutines.runBlocking import org.springframework.boot.ApplicationArguments import org.springframework.boot.ApplicationRunner import org.springframework.context.annotation.Profile import org.springframework.stereotype.Component -import org.springframework.web.reactive.function.client.WebClientResponseException /** * DataTransferApplicationRunner is responsible for initializing the OpenSearch setup and @@ -48,13 +48,8 @@ class DataTransferApplicationRunner( openSearchSetupService.start() logger.info("All setup tasks are completed, starting SSE client...") eventDispatcherClient.start().join() - } catch (e: WebClientResponseException) { - logger.error( - "OpenSearch connection failed " + - "| [Status Code: ${e.statusCode.value()}] " + - "| [Body: ${e.responseBodyAsString}]", - e, - ) + } catch (e: HttpClientException) { + logger.error("OpenSearch connection failed", e) } catch (e: Exception) { logger.error("OpenSearch setup failed due to an unexpected error", e) } finally { diff --git a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/EventDispatcherClient.kt b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/EventDispatcherClient.kt index c4a20f8..84e20f1 100644 --- a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/EventDispatcherClient.kt +++ b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/EventDispatcherClient.kt @@ -5,6 +5,7 @@ import ch.srgssr.pillarbox.monitoring.benchmark.timed import ch.srgssr.pillarbox.monitoring.cache.LRUCache import ch.srgssr.pillarbox.monitoring.event.model.EventRequest import ch.srgssr.pillarbox.monitoring.event.repository.EventRepository +import ch.srgssr.pillarbox.monitoring.exception.HttpClientException import ch.srgssr.pillarbox.monitoring.flow.chunked import ch.srgssr.pillarbox.monitoring.log.info import ch.srgssr.pillarbox.monitoring.log.logger @@ -18,7 +19,6 @@ import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.onEach import org.springframework.stereotype.Service -import org.springframework.web.reactive.function.client.WebClientResponseException /** * Service responsible for consuming events from a remote event dispatcher service via Server-Sent Events (SSE), @@ -98,13 +98,8 @@ class EventDispatcherClient( timed("EventRepository.saveEvents") { eventRepository.saveAll(events) } - } catch (e: WebClientResponseException) { - logger.error( - "A connection error occurred while saving the current batch " + - "| [Status Code: ${e.statusCode.value()}] " + - "| [Body: ${e.responseBodyAsString}]", - e, - ) + } catch (e: HttpClientException) { + logger.error("An connection error occurred while saving the current batch", e) } catch (e: Exception) { logger.error("An unexpected error occurred while saving the current batch", e) } diff --git a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/EventFlowProvider.kt b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/EventFlowProvider.kt index 2d30355..037b390 100644 --- a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/EventFlowProvider.kt +++ b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/EventFlowProvider.kt @@ -1,12 +1,22 @@ package ch.srgssr.pillarbox.monitoring.event import ch.srgssr.pillarbox.monitoring.event.model.EventRequest +import ch.srgssr.pillarbox.monitoring.log.error import ch.srgssr.pillarbox.monitoring.log.logger +import ch.srgssr.pillarbox.monitoring.log.warn +import com.fasterxml.jackson.databind.ObjectMapper +import io.ktor.client.HttpClient +import io.ktor.client.engine.cio.CIO +import io.ktor.client.plugins.defaultRequest +import io.ktor.client.plugins.sse.SSE +import io.ktor.client.plugins.sse.sse +import io.ktor.http.ContentType +import io.ktor.http.contentType +import kotlinx.coroutines.channels.awaitClose import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.callbackFlow import kotlinx.coroutines.flow.retryWhen import org.springframework.stereotype.Component -import org.springframework.web.reactive.function.client.WebClient -import org.springframework.web.reactive.function.client.bodyToFlow /** * Provides a reactive [Flow] of [EventRequest]s by connecting to a remote Server-Sent Events (SSE) endpoint. @@ -17,12 +27,12 @@ import org.springframework.web.reactive.function.client.bodyToFlow * - Applying retry logic in case of transient failures, with support for logging retry attempts. * * @property properties The SSE client configuration including URI and retry strategy. - * @property webClientBuilder A builder for constructing the [WebClient] used to make SSE requests. + * @property objectMapper */ @Component class EventFlowProvider( private val properties: EventDispatcherClientConfiguration, - webClientBuilder: WebClient.Builder, + private val objectMapper: ObjectMapper, ) { private companion object { /** @@ -31,26 +41,47 @@ class EventFlowProvider( private val logger = logger() } - private val webClient = webClientBuilder.baseUrl(properties.uri.toString()).build() + private val httpClient = + HttpClient(CIO) { + install(SSE) + + defaultRequest { + url(properties.uri.toString()) + contentType(ContentType.Application.Json) + } + } /** * Creates and returns a [Flow] of [EventRequest]s from the SSE endpoint. * * @return A [Flow] that emits [EventRequest]s received from the remote SSE endpoint. */ + @Suppress("TooGenericExceptionCaught") fun start(): Flow = - webClient - .get() - .retrieve() - .bodyToFlow() - .retryWhen( - properties.sseRetry.toRetryWhen( - onRetry = { cause, attempt, delayMillis -> - logger.warn( - "Retrying after failure: ${cause.message}. " + - "Attempt ${attempt + 1}. Waiting for ${delayMillis}ms", - ) - }, - ), - ) + callbackFlow { + try { + httpClient.sse("") { + incoming.collect { + val event = objectMapper.readValue(it.data, EventRequest::class.java) + trySend(event).isSuccess + } + } + } catch (e: Exception) { + logger.error(e) { "SSE connection failed: ${e.message}" } + close(e) + } + + awaitClose { + logger.info("SSE flow closed") + httpClient.close() + } + }.retryWhen( + properties.sseRetry.toRetryWhen( + onRetry = { cause, attempt, delayMillis -> + logger.warn(cause) { + "Retrying after failure: ${cause.message}. Attempt ${attempt + 1}. Waiting for ${delayMillis}ms" + } + }, + ), + ) } diff --git a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/config/RetryProperties.kt b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/config/RetryProperties.kt index 95a03bd..e3c6b16 100644 --- a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/config/RetryProperties.kt +++ b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/config/RetryProperties.kt @@ -2,8 +2,6 @@ package ch.srgssr.pillarbox.monitoring.event.config import kotlinx.coroutines.delay import kotlinx.coroutines.flow.FlowCollector -import reactor.util.retry.Retry -import reactor.util.retry.RetryBackoffSpec import java.time.Duration /** @@ -18,18 +16,6 @@ data class RetryProperties( val initialInterval: Duration = Duration.ofSeconds(5), val maxInterval: Duration = Duration.ofMinutes(1), ) { - /** - * Creates a [RetryBackoffSpec] based on the retry properties. - * This specification defines the backoff strategy for retries, - * including the number of attempts and interval timings. - * - * @return A configured [RetryBackoffSpec] for use with Reactor's retry mechanism. - */ - fun create(): RetryBackoffSpec = - Retry - .backoff(maxAttempts, initialInterval) - .maxBackoff(maxInterval) - /** * Creates a `retryWhen` callback function for Kotlin Flow. * diff --git a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/repository/EventRepository.kt b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/repository/EventRepository.kt index 7749149..91c626c 100644 --- a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/repository/EventRepository.kt +++ b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/repository/EventRepository.kt @@ -1,27 +1,30 @@ package ch.srgssr.pillarbox.monitoring.event.repository import ch.srgssr.pillarbox.monitoring.event.model.EventRequest +import ch.srgssr.pillarbox.monitoring.io.onSuccess +import ch.srgssr.pillarbox.monitoring.io.throwOnNotSuccess import ch.srgssr.pillarbox.monitoring.log.debug import ch.srgssr.pillarbox.monitoring.log.error import ch.srgssr.pillarbox.monitoring.log.logger import com.fasterxml.jackson.databind.ObjectMapper -import kotlinx.coroutines.reactor.awaitSingleOrNull +import io.ktor.client.HttpClient +import io.ktor.client.call.body +import io.ktor.client.request.post +import io.ktor.client.request.setBody +import io.ktor.client.request.url import org.springframework.beans.factory.annotation.Qualifier -import org.springframework.http.HttpHeaders -import org.springframework.http.MediaType import org.springframework.stereotype.Component -import org.springframework.web.reactive.function.client.WebClient /** * Repository responsible for sending event data to OpenSearch using the bulk API. * - * @property webClient The [WebClient] configured to connect to the OpenSearch instance. + * @property httpClient The [HttpClient] configured to connect to the OpenSearch instance. * @property objectMapper Jackson's [ObjectMapper] used to serialize event objects. */ @Component class EventRepository( - @param:Qualifier("openSearchWebClient") - private val webClient: WebClient, + @param:Qualifier("openSearchHttpClient") + private val httpClient: HttpClient, private val objectMapper: ObjectMapper, ) { private companion object { @@ -43,15 +46,12 @@ class EventRepository( * @param events A list of events to be indexed. */ suspend fun saveAll(events: List) { - webClient - .post() - .uri("/_bulk") - .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) - .bodyValue(toNDJson(events)) - .retrieve() - .bodyToMono(String::class.java) - .awaitSingleOrNull() - ?.let { responseStr -> + httpClient + .post { + url("/_bulk") + setBody(toNDJson(events)) + }.onSuccess { + val responseStr = body() logger.debug { "Bulk response: $responseStr " } val response = objectMapper.readTree(responseStr) @@ -67,7 +67,7 @@ class EventRepository( } } } - } + }.throwOnNotSuccess { "Connection error" } } /** diff --git a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/repository/OpenSearchConfigurationProperties.kt b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/repository/OpenSearchConfigurationProperties.kt index 6dc7f86..87ff2c5 100644 --- a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/repository/OpenSearchConfigurationProperties.kt +++ b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/repository/OpenSearchConfigurationProperties.kt @@ -3,7 +3,6 @@ package ch.srgssr.pillarbox.monitoring.event.repository import ch.srgssr.pillarbox.monitoring.event.config.RetryProperties import org.springframework.boot.context.properties.ConfigurationProperties import org.springframework.boot.context.properties.NestedConfigurationProperty -import org.springframework.util.unit.DataSize import java.net.URI /** @@ -14,8 +13,6 @@ import java.net.URI * @property uri The URI of the OpenSearch server. Defaults to `http://localhost:9200`. * @property retry Nested configuration properties for retry settings related to OpenSearch operations. * @property timeout The default timeout for each connection in milliseconds. 10s by default. - * @property maxInMemorySize The maximum size in bytes allowed for buffering response bodies in memory, - * 64 MB by default. */ @ConfigurationProperties(prefix = "pillarbox.monitoring.opensearch") data class OpenSearchConfigurationProperties( @@ -23,5 +20,4 @@ data class OpenSearchConfigurationProperties( @NestedConfigurationProperty val retry: RetryProperties = RetryProperties(), val timeout: Int = 10_000, - val maxInMemorySize: DataSize = DataSize.ofMegabytes(64), ) diff --git a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/AliasSetupTask.kt b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/AliasSetupTask.kt index 6db8b91..0066a31 100644 --- a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/AliasSetupTask.kt +++ b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/AliasSetupTask.kt @@ -1,19 +1,22 @@ package ch.srgssr.pillarbox.monitoring.event.setup +import ch.srgssr.pillarbox.monitoring.io.is4xxClientError import ch.srgssr.pillarbox.monitoring.io.loadResourceContent -import ch.srgssr.pillarbox.monitoring.log.error +import ch.srgssr.pillarbox.monitoring.io.onStatus +import ch.srgssr.pillarbox.monitoring.io.onSuccess +import ch.srgssr.pillarbox.monitoring.io.throwOnNotSuccess import ch.srgssr.pillarbox.monitoring.log.info import ch.srgssr.pillarbox.monitoring.log.logger -import kotlinx.coroutines.reactor.awaitSingleOrNull +import io.ktor.client.HttpClient +import io.ktor.client.request.get +import io.ktor.client.request.post +import io.ktor.client.request.setBody +import io.ktor.client.request.url +import io.ktor.http.HttpStatusCode import org.springframework.beans.factory.annotation.Qualifier import org.springframework.core.annotation.Order import org.springframework.core.io.support.ResourcePatternResolver -import org.springframework.http.HttpHeaders -import org.springframework.http.HttpStatusCode -import org.springframework.http.MediaType import org.springframework.stereotype.Component -import org.springframework.web.reactive.function.client.WebClient -import reactor.core.publisher.Mono /** * Task responsible for setting up a filtered alias in OpenSearch. @@ -21,14 +24,14 @@ import reactor.core.publisher.Mono * This task checks if the specified alias exists in OpenSearch. If it does not, it loads the alias * configuration from `resources/opensearch/user_events-alias.json` and creates it. * - * @property webClient WebClient instance used to interact with the OpenSearch API. + * @property httpClient [HttpClient] instance used to interact with the OpenSearch API. * @property resourceLoader Resource loader used to access the alias configuration JSON file. */ @Component @Order(4) class AliasSetupTask( - @param:Qualifier("openSearchWebClient") - private val webClient: WebClient, + @param:Qualifier("openSearchHttpClient") + private val httpClient: HttpClient, private val resourceLoader: ResourcePatternResolver, ) : OpenSearchSetupTask { private companion object { @@ -52,33 +55,23 @@ class AliasSetupTask( for (resource in resources) { val filename = resource.filename ?: continue val aliasName = filename.removeSuffix("-alias.json") - checkAndCreateAlias(aliasName).awaitSingleOrNull() + checkAndCreateAlias(aliasName) } } - private fun checkAndCreateAlias(aliasName: String): Mono<*> = - webClient - .get() - .uri("/_alias/$aliasName") - .retrieve() + private suspend fun checkAndCreateAlias(aliasName: String) = + httpClient + .get("/_alias/$aliasName") .onStatus(HttpStatusCode::is4xxClientError) { - logger.info { "Alias '$aliasName' does not exist, creating alias..." } - createAlias(aliasName).then(Mono.empty()) - }.onStatus(HttpStatusCode::is2xxSuccessful) { logger.info { "Alias '$aliasName' already exists, skipping creation." } - Mono.empty() - }.toBodilessEntity() + createAlias(aliasName) + }.onSuccess { logger.info { "Alias '$aliasName' does not exist, creating alias..." } } - private fun createAlias(aliasName: String): Mono<*> { - val indexTemplateJson = resourceLoader.loadResourceContent("classpath:opensearch/$aliasName-alias.json") - return webClient - .post() - .uri("/_aliases") - .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) - .bodyValue(indexTemplateJson) - .retrieve() - .toBodilessEntity() - .doOnSuccess { logger.info { "Alias '$aliasName' created successfully" } } - .doOnError { e -> logger.error { "Failed to create alias '$aliasName': ${e.message}" } } - } + private suspend fun createAlias(aliasName: String) = + httpClient + .post { + url("/_aliases") + setBody(resourceLoader.loadResourceContent("classpath:opensearch/$aliasName-alias.json")) + }.onSuccess { logger.info { "Alias '$aliasName' created successfully" } } + .throwOnNotSuccess { "Failed to create alias '$aliasName'" } } diff --git a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/ISMPolicySetupTask.kt b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/ISMPolicySetupTask.kt index d30c81a..f5d2a4f 100644 --- a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/ISMPolicySetupTask.kt +++ b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/ISMPolicySetupTask.kt @@ -1,18 +1,21 @@ package ch.srgssr.pillarbox.monitoring.event.setup +import ch.srgssr.pillarbox.monitoring.io.is4xxClientError import ch.srgssr.pillarbox.monitoring.io.loadResourceContent -import ch.srgssr.pillarbox.monitoring.log.error +import ch.srgssr.pillarbox.monitoring.io.onStatus +import ch.srgssr.pillarbox.monitoring.io.onSuccess +import ch.srgssr.pillarbox.monitoring.io.throwOnNotSuccess +import ch.srgssr.pillarbox.monitoring.log.info import ch.srgssr.pillarbox.monitoring.log.logger -import kotlinx.coroutines.reactor.awaitSingleOrNull +import io.ktor.client.HttpClient +import io.ktor.client.request.get +import io.ktor.client.request.put +import io.ktor.client.request.setBody +import io.ktor.client.request.url import org.springframework.beans.factory.annotation.Qualifier import org.springframework.core.annotation.Order import org.springframework.core.io.support.ResourcePatternResolver -import org.springframework.http.HttpHeaders -import org.springframework.http.HttpStatusCode -import org.springframework.http.MediaType import org.springframework.stereotype.Component -import org.springframework.web.reactive.function.client.WebClient -import reactor.core.publisher.Mono /** * Task responsible for setting up the Index State Management (ISM) policy in OpenSearch. @@ -20,14 +23,14 @@ import reactor.core.publisher.Mono * This task checks if the ISM policy already exists in OpenSearch. If it does not, it loads * the ISM policy configuration from `resources/opensearch/heartbeat_events-policy.json`. * - * @property webClient WebClient instance for interacting with the OpenSearch API. + * @property httpClient HttpClient instance used to interact with the OpenSearch API. * @property resourceLoader Resource loader for accessing the ISM policy configuration file. */ @Component @Order(1) class ISMPolicySetupTask( - @param:Qualifier("openSearchWebClient") - private val webClient: WebClient, + @param:Qualifier("openSearchHttpClient") + private val httpClient: HttpClient, private val resourceLoader: ResourcePatternResolver, ) : OpenSearchSetupTask { private companion object { @@ -51,34 +54,26 @@ class ISMPolicySetupTask( for (resource in resources) { val filename = resource.filename ?: continue val policyName = filename.removeSuffix("-policy.json") - checkAndApplyISMPolicy(policyName).awaitSingleOrNull() + checkAndApplyISMPolicy(policyName) } } - private fun checkAndApplyISMPolicy(policyName: String): Mono<*> = - webClient - .get() - .uri("/_plugins/_ism/policies/${policyName}_policy") - .retrieve() - .onStatus(HttpStatusCode::is4xxClientError) { - logger.info("ISM policy '${policyName}_policy' does not exist, creating new ISM policy...") - applyISMPolicy(policyName).then(Mono.empty()) - }.onStatus(HttpStatusCode::is2xxSuccessful) { - logger.info("ISM policy '${policyName}_policy' already exists, skipping creation.") - Mono.empty() - }.toBodilessEntity() + private suspend fun checkAndApplyISMPolicy(policyName: String) = + httpClient + .get("/_plugins/_ism/policies/${policyName}_policy") + .onStatus(io.ktor.http.HttpStatusCode::is4xxClientError) { + logger.info { "ISM policy '${policyName}_policy' does not exist, creating new ISM policy..." } + applyISMPolicy(policyName) + }.onSuccess { + logger.info { "ISM policy '${policyName}_policy' already exists, skipping creation." } + } - private fun applyISMPolicy(policyName: String): Mono<*> { - val ismPolicyJson = resourceLoader.loadResourceContent("classpath:opensearch/$policyName-policy.json") - - return webClient - .put() - .uri("/_plugins/_ism/policies/${policyName}_policy") - .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) - .bodyValue(ismPolicyJson) - .retrieve() - .toBodilessEntity() - .doOnSuccess { logger.info("ISM Policy '${policyName}_policy' applied successfully") } - .doOnError { e -> logger.error { "Failed to apply ISM Policy '${policyName}_policy': ${e.message}" } } - } + private suspend fun applyISMPolicy(policyName: String) = + httpClient + .put { + url("/_plugins/_ism/policies/${policyName}_policy") + setBody(resourceLoader.loadResourceContent("classpath:opensearch/$policyName-policy.json")) + }.onSuccess { + logger.info { "ISM Policy '${policyName}_policy' applied successfully" } + }.throwOnNotSuccess { "Failed to apply ISM Policy '${policyName}_policy'" } } diff --git a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/IndexSetupTask.kt b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/IndexSetupTask.kt index b69f77f..2468cad 100644 --- a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/IndexSetupTask.kt +++ b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/IndexSetupTask.kt @@ -1,18 +1,22 @@ package ch.srgssr.pillarbox.monitoring.event.setup +import ch.srgssr.pillarbox.monitoring.io.is4xxClientError import ch.srgssr.pillarbox.monitoring.io.loadResourceContent -import ch.srgssr.pillarbox.monitoring.log.error +import ch.srgssr.pillarbox.monitoring.io.onStatus +import ch.srgssr.pillarbox.monitoring.io.onSuccess +import ch.srgssr.pillarbox.monitoring.io.throwOnNotSuccess +import ch.srgssr.pillarbox.monitoring.log.info import ch.srgssr.pillarbox.monitoring.log.logger -import kotlinx.coroutines.reactor.awaitSingleOrNull +import io.ktor.client.HttpClient +import io.ktor.client.request.head +import io.ktor.client.request.put +import io.ktor.client.request.setBody +import io.ktor.client.request.url +import io.ktor.http.HttpStatusCode import org.springframework.beans.factory.annotation.Qualifier import org.springframework.core.annotation.Order import org.springframework.core.io.support.ResourcePatternResolver -import org.springframework.http.HttpHeaders -import org.springframework.http.HttpStatusCode -import org.springframework.http.MediaType import org.springframework.stereotype.Component -import org.springframework.web.reactive.function.client.WebClient -import reactor.core.publisher.Mono /** * Task responsible for setting up an OpenSearch index if it does not already exist. @@ -21,14 +25,14 @@ import reactor.core.publisher.Mono * does not exist, it creates the index from the template stored * in `resources/opensearch/core_events-template.json`. * - * @property webClient WebClient instance used to interact with the OpenSearch API. + * @property httpClient HttpClient instance used to interact with the OpenSearch API. * @property resourceLoader Resource loader used to access the index template JSON file. */ @Component @Order(3) class IndexSetupTask( - @param:Qualifier("openSearchWebClient") - private val webClient: WebClient, + @param:Qualifier("openSearchHttpClient") + private val httpClient: HttpClient, private val resourceLoader: ResourcePatternResolver, ) : OpenSearchSetupTask { private companion object { @@ -52,33 +56,23 @@ class IndexSetupTask( for (resource in resources) { val filename = resource.filename ?: continue val indexName = filename.removeSuffix("-index.json") - checkAndCreateIndex(indexName).awaitSingleOrNull() + checkAndCreateIndex(indexName) } } - private fun checkAndCreateIndex(indexName: String): Mono<*> = - webClient - .head() - .uri("/$indexName") - .retrieve() + private suspend fun checkAndCreateIndex(indexName: String) = + httpClient + .head("/$indexName") .onStatus(HttpStatusCode::is4xxClientError) { - logger.info("Index '$indexName' does not exist, creating index...") - createIndex(indexName).then(Mono.empty()) - }.onStatus(HttpStatusCode::is2xxSuccessful) { - logger.info("Index '$indexName' already exists, skipping creation.") - Mono.empty() - }.toBodilessEntity() + logger.info { "Index '$indexName' does not exist, creating index..." } + createIndex(indexName) + }.onSuccess { logger.info { "Index '$indexName' already exists, skipping creation." } } - private fun createIndex(indexName: String): Mono<*> { - val indexJson = resourceLoader.loadResourceContent("classpath:opensearch/$indexName-index.json") - return webClient - .put() - .uri("/$indexName-000001") - .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) - .bodyValue(indexJson) - .retrieve() - .toBodilessEntity() - .doOnSuccess { logger.info("Index '$indexName' created successfully") } - .doOnError { e -> logger.error { "Failed to create index '$indexName': ${e.message}" } } - } + private suspend fun createIndex(indexName: String) = + httpClient + .put { + url("/$indexName-000001") + setBody(resourceLoader.loadResourceContent("classpath:opensearch/$indexName-index.json")) + }.onSuccess { logger.info { "Index '$indexName' created successfully" } } + .throwOnNotSuccess { "Failed to create index '$indexName'" } } diff --git a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/IndexTemplateSetupTask.kt b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/IndexTemplateSetupTask.kt index 3d91748..8c2f973 100644 --- a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/IndexTemplateSetupTask.kt +++ b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/IndexTemplateSetupTask.kt @@ -1,32 +1,36 @@ package ch.srgssr.pillarbox.monitoring.event.setup +import ch.srgssr.pillarbox.monitoring.io.is4xxClientError import ch.srgssr.pillarbox.monitoring.io.loadResourceContent -import ch.srgssr.pillarbox.monitoring.log.error +import ch.srgssr.pillarbox.monitoring.io.onStatus +import ch.srgssr.pillarbox.monitoring.io.onSuccess +import ch.srgssr.pillarbox.monitoring.io.throwOnNotSuccess +import ch.srgssr.pillarbox.monitoring.log.info import ch.srgssr.pillarbox.monitoring.log.logger -import kotlinx.coroutines.reactor.awaitSingleOrNull +import io.ktor.client.HttpClient +import io.ktor.client.request.get +import io.ktor.client.request.put +import io.ktor.client.request.setBody +import io.ktor.client.request.url +import io.ktor.http.HttpStatusCode import org.springframework.beans.factory.annotation.Qualifier import org.springframework.core.annotation.Order import org.springframework.core.io.support.ResourcePatternResolver -import org.springframework.http.HttpHeaders -import org.springframework.http.HttpStatusCode -import org.springframework.http.MediaType import org.springframework.stereotype.Component -import org.springframework.web.reactive.function.client.WebClient -import reactor.core.publisher.Mono /** * Task responsible for setting up the OpenSearch index template. * * This task creates the index template stored in `resources/opensearch/core_events-template.json`. * - * @property webClient WebClient instance used to interact with the OpenSearch API. + * @property httpClient HttpClient instance used to interact with the OpenSearch API. * @property resourceLoader Resource loader used to access the index template JSON file. */ @Component @Order(2) class IndexTemplateSetupTask( - @param:Qualifier("openSearchWebClient") - private val webClient: WebClient, + @param:Qualifier("openSearchHttpClient") + private val httpClient: HttpClient, private val resourceLoader: ResourcePatternResolver, ) : OpenSearchSetupTask { private companion object { @@ -50,33 +54,25 @@ class IndexTemplateSetupTask( for (resource in resources) { val filename = resource.filename ?: continue val templateName = filename.removeSuffix("-template.json") - checkAndCreateTemplate(templateName).awaitSingleOrNull() + checkAndCreateTemplate(templateName) } } - private fun checkAndCreateTemplate(templateName: String): Mono<*> = - webClient - .get() - .uri("/_index_template/${templateName}_template") - .retrieve() + private suspend fun checkAndCreateTemplate(templateName: String) = + httpClient + .get("/_index_template/${templateName}_template") .onStatus(HttpStatusCode::is4xxClientError) { - logger.info("Index template '${templateName}_template' does not exist, creating it...") - createTemplate(templateName).then(Mono.empty()) - }.onStatus(HttpStatusCode::is2xxSuccessful) { - logger.info("Index template '${templateName}_template' already exists, skipping creation.") - Mono.empty() - }.toBodilessEntity() + logger.info { "Index template '${templateName}_template' does not exist, creating it..." } + createTemplate(templateName) + }.onSuccess { + logger.info { "Index template '${templateName}_template' already exists, skipping creation." } + } - private fun createTemplate(templateName: String): Mono<*> { - val indexTemplateJson = resourceLoader.loadResourceContent("classpath:opensearch/$templateName-template.json") - return webClient - .put() - .uri("/_index_template/${templateName}_template") - .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) - .bodyValue(indexTemplateJson) - .retrieve() - .toBodilessEntity() - .doOnSuccess { logger.info("Index template '${templateName}_template' created successfully") } - .doOnError { e -> logger.error { "Failed to create index template '${templateName}_template': ${e.message}" } } - } + private suspend fun createTemplate(templateName: String) = + httpClient + .put { + url("/_index_template/${templateName}_template") + setBody(resourceLoader.loadResourceContent("classpath:opensearch/$templateName-template.json")) + }.onSuccess { logger.info { "Index template '${templateName}_template' created successfully" } } + .throwOnNotSuccess { "Failed to create index template '${templateName}_template'" } } diff --git a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/OpenSearchSetupConfiguration.kt b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/OpenSearchSetupConfiguration.kt index 5d53394..a16c498 100644 --- a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/OpenSearchSetupConfiguration.kt +++ b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/OpenSearchSetupConfiguration.kt @@ -1,46 +1,38 @@ package ch.srgssr.pillarbox.monitoring.event.setup import ch.srgssr.pillarbox.monitoring.event.repository.OpenSearchConfigurationProperties -import io.netty.channel.ChannelOption +import io.ktor.client.HttpClient +import io.ktor.client.engine.cio.CIO +import io.ktor.client.plugins.HttpTimeout +import io.ktor.client.plugins.defaultRequest +import io.ktor.http.ContentType +import io.ktor.http.contentType import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration -import org.springframework.http.client.reactive.ReactorClientHttpConnector -import org.springframework.web.reactive.function.client.ExchangeStrategies -import org.springframework.web.reactive.function.client.WebClient -import reactor.netty.http.client.HttpClient /** * Configuration class for OpenSearch setup. * - * Provides a WebClient bean configured with the OpenSearch URI. + * Provides a [HttpClient] bean configured with the OpenSearch URI. */ @Configuration class OpenSearchSetupConfiguration { - /** - * Creates a WebClient bean for OpenSearch using the specified URI from the properties. - * - * @param properties OpenSearch configuration properties containing the URI. - * @return Configured WebClient instance for OpenSearch. - */ - @Bean("openSearchWebClient") - fun openSearchWebClient(properties: OpenSearchConfigurationProperties): WebClient { - val httpClient = - HttpClient - .create() - .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, properties.timeout) + @Bean("openSearchHttpClient") + fun openSearchHttpClient(properties: OpenSearchConfigurationProperties): HttpClient = + HttpClient(CIO) { + engine { + requestTimeout = properties.timeout.toLong() // milliseconds + } - return WebClient - .builder() - .baseUrl(properties.uri.toString()) - .clientConnector(ReactorClientHttpConnector(httpClient)) - .exchangeStrategies( - ExchangeStrategies - .builder() - .codecs { - it.defaultCodecs().maxInMemorySize( - properties.maxInMemorySize.toBytes().toInt(), - ) - }.build(), - ).build() - } + install(HttpTimeout) { + requestTimeoutMillis = properties.timeout.toLong() + connectTimeoutMillis = properties.timeout.toLong() + socketTimeoutMillis = properties.timeout.toLong() + } + + defaultRequest { + url(properties.uri.toString()) + contentType(ContentType.Application.Json) + } + } } diff --git a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/OpenSearchSetupService.kt b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/OpenSearchSetupService.kt index 16b8908..63fa131 100644 --- a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/OpenSearchSetupService.kt +++ b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/OpenSearchSetupService.kt @@ -1,12 +1,18 @@ package ch.srgssr.pillarbox.monitoring.event.setup import ch.srgssr.pillarbox.monitoring.event.repository.OpenSearchConfigurationProperties +import ch.srgssr.pillarbox.monitoring.io.onSuccess +import ch.srgssr.pillarbox.monitoring.io.throwOnNotSuccess import ch.srgssr.pillarbox.monitoring.log.info import ch.srgssr.pillarbox.monitoring.log.logger -import kotlinx.coroutines.reactor.awaitSingleOrNull +import ch.srgssr.pillarbox.monitoring.log.warn +import io.ktor.client.HttpClient +import io.ktor.client.request.get +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.retryWhen import org.springframework.beans.factory.annotation.Qualifier import org.springframework.stereotype.Service -import org.springframework.web.reactive.function.client.WebClient /** * Service responsible for setting up the OpenSearch environment and ensuring @@ -15,14 +21,14 @@ import org.springframework.web.reactive.function.client.WebClient * * Discovers all [OpenSearchSetupTask] in the context and executes them sequentially. * - * @property webClient The web client instance configured for OpenSearch. + * @property httpClient HttpClient instance used to interact with the OpenSearch API. * @property tasks The list of setup tasks that must be executed to prepare the OpenSearch environment. * @property properties OpenSearch configuration properties including the URI and retry settings. */ @Service class OpenSearchSetupService( - @param:Qualifier("openSearchWebClient") - private val webClient: WebClient, + @param:Qualifier("openSearchHttpClient") + private val httpClient: HttpClient, private val tasks: List, private val properties: OpenSearchConfigurationProperties, ) { @@ -47,17 +53,22 @@ class OpenSearchSetupService( } private suspend fun checkOpenSearchHealth() { - webClient - .get() - .uri("/") - .retrieve() - .toBodilessEntity() - .retryWhen( - properties.retry.create().doBeforeRetry { - logger.info("Retrying OpenSearch health check...") + flow { + emit( + httpClient + .get("/") + .onSuccess { logger.info("OpenSearch is healthy, proceeding with setup...") } + .throwOnNotSuccess { "Connection error while checking OpenSearch health" }, + ) + }.retryWhen( + properties.retry.toRetryWhen( + onRetry = { cause, attempt, delayMillis -> + logger.warn(cause) { + "Retrying after failure: ${cause.message}. Attempt ${attempt + 1}. Waiting for ${delayMillis}ms" + } }, - ).doOnSuccess { logger.info("OpenSearch is healthy, proceeding with setup...") } - .awaitSingleOrNull() + ), + ).collect() } private suspend fun runSetupTasks() { diff --git a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/exception/HttpClientException.kt b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/exception/HttpClientException.kt new file mode 100644 index 0000000..cb3ffc9 --- /dev/null +++ b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/exception/HttpClientException.kt @@ -0,0 +1,12 @@ +package ch.srgssr.pillarbox.monitoring.exception + +/** + * Exception thrown when an HTTP client request fails or returns an unexpected response. + * + * @param message A descriptive message explaining the reason for the exception. + * @param cause The underlying exception that caused the failur, if available. This can be null. + */ +class HttpClientException( + message: String, + cause: Throwable? = null, +) : RuntimeException(message, cause) diff --git a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/exception/RetryExhaustedException.kt b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/exception/RetryExhaustedException.kt deleted file mode 100644 index 4a5d39c..0000000 --- a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/exception/RetryExhaustedException.kt +++ /dev/null @@ -1,16 +0,0 @@ -package ch.srgssr.pillarbox.monitoring.exception - -/** - * Exception thrown when a retry mechanism has exhausted its allowed number of attempts. - * - * The actual exception used by Spring Boot's reactive WebClient for retry exhaustion is private, - * making it unavailable for type checking. This custom exception is created to provide clarity and - * allow proper handling of retry exhaustion scenarios. - * - * @param message A descriptive message explaining the reason for the exception. - * @param cause The underlying exception that caused the failur, if available. This can be null. - */ -class RetryExhaustedException( - message: String, - cause: Throwable?, -) : RuntimeException(message, cause) diff --git a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/io/HttpUtils.kt b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/io/HttpUtils.kt new file mode 100644 index 0000000..2c81b97 --- /dev/null +++ b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/io/HttpUtils.kt @@ -0,0 +1,59 @@ +package ch.srgssr.pillarbox.monitoring.io + +import ch.srgssr.pillarbox.monitoring.exception.HttpClientException +import io.ktor.client.call.body +import io.ktor.client.statement.HttpResponse +import io.ktor.http.HttpStatusCode +import io.ktor.http.isSuccess + +/** + * Checks whether the HTTP status code represents a client error (4xx). + * + * @return `true` if the status code is in the range 400..499, `false` otherwise. + */ +fun HttpStatusCode.is4xxClientError(): Boolean = value in (400..499) + +/** + * Executes the given [block] if the HTTP response status is successful (2xx). + * + * @param block The lambda to execute on successful response. + * @return The original [HttpResponse], allowing chaining of further operations. + */ +inline fun HttpResponse.onSuccess(block: HttpResponse.() -> Unit) = apply { if (status.isSuccess()) block() } + +/** + * Executes the given [block] if the HTTP response status matches the provided [predicate]. + * + * @param predicate A lambda that returns `true` for the statuses you want to handle. + * @param block The lambda to execute if [predicate] returns `true`. + * @return The original [HttpResponse], allowing chaining of further operations. + */ +inline fun HttpResponse.onStatus( + predicate: (HttpStatusCode) -> Boolean, + block: HttpResponse.() -> Unit, +) = apply { if (predicate(status)) block() } + +/** + * Executes the given [block] if the HTTP response status is not successful (non-2xx). + * + * @param block The lambda to execute on non-successful response. + * @return The original [HttpResponse], allowing chaining of further operations. + */ +inline fun HttpResponse.onNotSuccess(block: HttpResponse.() -> Unit) = apply { if (!status.isSuccess()) block() } + +/** + * Throws an [HttpClientException] if the HTTP response is not successful. + * + * The exception message will include the result of [lazyMessage], the HTTP status code, + * and the response body (if available). + * + * @param lazyMessage A lambda providing a custom error message. + * @throws HttpClientException if the response status is not successful. + */ +suspend inline fun HttpResponse.throwOnNotSuccess(lazyMessage: () -> String) = + onNotSuccess { + val errorBody = runCatching { body() }.getOrDefault("") + val fullMessage = "${lazyMessage()}: HTTP $status - $errorBody" + + throw HttpClientException(fullMessage) + }