Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 28 additions & 24 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ import org.jlleitschuh.gradle.ktlint.reporter.ReporterType.PLAIN
import java.util.Properties

plugins {
kotlin("jvm") version "2.2.21"
kotlin("plugin.spring") version "2.2.21"
id("org.springframework.boot") version "3.5.7"
id("io.spring.dependency-management") version "1.1.7"
id("dev.detekt") version ("2.0.0-alpha.1")
id("org.jlleitschuh.gradle.ktlint") version "14.0.1"
id("org.jetbrains.kotlinx.kover") version "0.9.3"
id("com.github.ben-manes.versions") version "0.53.0"
alias(libs.plugins.kotlin.jvm)
alias(libs.plugins.kotlin.spring)
alias(libs.plugins.spring.boot)
alias(libs.plugins.spring.dependency.management)
alias(libs.plugins.detekt)
alias(libs.plugins.ktlint)
alias(libs.plugins.kover)
alias(libs.plugins.versions)
}

group = "ch.srgssr.pillarbox"
Expand All @@ -26,22 +26,23 @@ repositories {

dependencies {
// Dependencies
implementation("org.springframework.boot:spring-boot-starter-webflux")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
implementation("org.jetbrains.kotlin:kotlin-reflect")
implementation("nl.basjes.parse.useragent:yauaa:7.32.0")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor")
implementation(libs.spring.boot.starter.json)
implementation(libs.jackson.module.kotlin)
implementation(libs.kotlin.reflect)
implementation(libs.yauaa)
implementation(libs.kotlin.coroutines.core)
implementation(libs.ktor.client.core)
implementation(libs.ktor.client.cio)

// Test Dependencies
testImplementation("io.kotest:kotest-runner-junit5:5.9.1")
testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("io.kotest.extensions:kotest-extensions-spring:1.3.0")
testImplementation("org.jetbrains.kotlin:kotlin-test-junit5")
testImplementation("io.mockk:mockk:1.14.6")
testImplementation("com.squareup.okhttp3:mockwebserver:5.3.1")
testImplementation("com.squareup.okhttp3:okhttp:5.3.1")
testRuntimeOnly("org.junit.platform:junit-platform-launcher")
testImplementation(libs.kotest.runner.junit5)
testImplementation(libs.spring.boot.starter.test)
testImplementation(libs.kotest.extensions.spring)
testImplementation(libs.kotlin.test.junit5)
testImplementation(libs.mockk)
testImplementation(libs.mockwebserver)
testImplementation(libs.okhttp)
testRuntimeOnly(libs.junit.platform.launcher)
}

kotlin {
Expand All @@ -51,14 +52,17 @@ kotlin {
}

detekt {
toolVersion = "2.0.0-alpha.1"
toolVersion = libs.versions.detekt.get()
buildUponDefaultConfig = true
allRules = false
config.setFrom("$projectDir/detekt.yml")
}

ktlint {
version.set("1.8.0")
version.set(
libs.versions.ktlint.cli
.get(),
)
debug.set(false)
android.set(false)
outputToConsole.set(true)
Expand Down
48 changes: 48 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
[versions]
#Plugins
kotlin = "2.2.21"
spring-boot = "3.5.7"
spring-deps = "1.1.7"
detekt = "2.0.0-alpha.1"
ktlint = "14.0.1"
ktlint-cli = "1.8.0"
kover = "0.9.3"
versions-plugin = "0.53.0"

# Libs
yauaa = "7.32.0"
ktor = "3.3.2"
kotest = "5.9.1"
kotest-spring = "1.3.0"
mockk = "1.14.6"
okhttp = "5.3.1"

[libraries]

spring-boot-starter-json = { module = "org.springframework.boot:spring-boot-starter-json" }
spring-boot-starter-test = { module = "org.springframework.boot:spring-boot-starter-test" }
kotlin-reflect = { module = "org.jetbrains.kotlin:kotlin-reflect" }
kotlin-coroutines-core = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-core"}
jackson-module-kotlin = { module = "com.fasterxml.jackson.module:jackson-module-kotlin" }
yauaa = { module = "nl.basjes.parse.useragent:yauaa", version.ref = "yauaa" }
ktor-client-core = { module = "io.ktor:ktor-client-core", version.ref = "ktor" }
ktor-client-cio = { module = "io.ktor:ktor-client-cio", version.ref = "ktor" }

# Tests
kotest-runner-junit5 = { module = "io.kotest:kotest-runner-junit5", version.ref = "kotest" }
kotest-extensions-spring = { module = "io.kotest.extensions:kotest-extensions-spring", version.ref = "kotest-spring" }
kotlin-test-junit5 = { module = "org.jetbrains.kotlin:kotlin-test-junit5" }
mockk = { module = "io.mockk:mockk", version.ref = "mockk" }
okhttp = { module = "com.squareup.okhttp3:okhttp", version.ref = "okhttp" }
mockwebserver = { module = "com.squareup.okhttp3:mockwebserver", version.ref = "okhttp" }
junit-platform-launcher = { module = "org.junit.platform:junit-platform-launcher" }

[plugins]
kotlin-jvm = { id = "org.jetbrains.kotlin.jvm", version.ref = "kotlin" }
kotlin-spring = { id = "org.jetbrains.kotlin.plugin.spring", version.ref = "kotlin" }
spring-boot = { id = "org.springframework.boot", version.ref = "spring-boot" }
spring-dependency-management = { id = "io.spring.dependency-management", version.ref = "spring-deps" }
detekt = { id = "dev.detekt", version.ref = "detekt" }
ktlint = { id = "org.jlleitschuh.gradle.ktlint", version.ref = "ktlint" }
kover = { id = "org.jetbrains.kotlinx.kover", version.ref = "kover" }
versions = { id = "com.github.ben-manes.versions", version.ref = "versions-plugin" }
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ package ch.srgssr.pillarbox.monitoring

import ch.srgssr.pillarbox.monitoring.event.EventDispatcherClient
import ch.srgssr.pillarbox.monitoring.event.setup.OpenSearchSetupService
import ch.srgssr.pillarbox.monitoring.exception.HttpClientException
import ch.srgssr.pillarbox.monitoring.log.logger
import kotlinx.coroutines.runBlocking
import org.springframework.boot.ApplicationArguments
import org.springframework.boot.ApplicationRunner
import org.springframework.context.annotation.Profile
import org.springframework.stereotype.Component
import org.springframework.web.reactive.function.client.WebClientResponseException

/**
* DataTransferApplicationRunner is responsible for initializing the OpenSearch setup and
Expand Down Expand Up @@ -48,13 +48,8 @@ class DataTransferApplicationRunner(
openSearchSetupService.start()
logger.info("All setup tasks are completed, starting SSE client...")
eventDispatcherClient.start().join()
} catch (e: WebClientResponseException) {
logger.error(
"OpenSearch connection failed " +
"| [Status Code: ${e.statusCode.value()}] " +
"| [Body: ${e.responseBodyAsString}]",
e,
)
} catch (e: HttpClientException) {
logger.error("OpenSearch connection failed", e)
} catch (e: Exception) {
logger.error("OpenSearch setup failed due to an unexpected error", e)
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import ch.srgssr.pillarbox.monitoring.benchmark.timed
import ch.srgssr.pillarbox.monitoring.cache.LRUCache
import ch.srgssr.pillarbox.monitoring.event.model.EventRequest
import ch.srgssr.pillarbox.monitoring.event.repository.EventRepository
import ch.srgssr.pillarbox.monitoring.exception.HttpClientException
import ch.srgssr.pillarbox.monitoring.flow.chunked
import ch.srgssr.pillarbox.monitoring.log.info
import ch.srgssr.pillarbox.monitoring.log.logger
Expand All @@ -18,7 +19,6 @@ import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach
import org.springframework.stereotype.Service
import org.springframework.web.reactive.function.client.WebClientResponseException

/**
* Service responsible for consuming events from a remote event dispatcher service via Server-Sent Events (SSE),
Expand Down Expand Up @@ -98,13 +98,8 @@ class EventDispatcherClient(
timed("EventRepository.saveEvents") {
eventRepository.saveAll(events)
}
} catch (e: WebClientResponseException) {
logger.error(
"A connection error occurred while saving the current batch " +
"| [Status Code: ${e.statusCode.value()}] " +
"| [Body: ${e.responseBodyAsString}]",
e,
)
} catch (e: HttpClientException) {
logger.error("An connection error occurred while saving the current batch", e)
} catch (e: Exception) {
logger.error("An unexpected error occurred while saving the current batch", e)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,22 @@
package ch.srgssr.pillarbox.monitoring.event

import ch.srgssr.pillarbox.monitoring.event.model.EventRequest
import ch.srgssr.pillarbox.monitoring.log.error
import ch.srgssr.pillarbox.monitoring.log.logger
import ch.srgssr.pillarbox.monitoring.log.warn
import com.fasterxml.jackson.databind.ObjectMapper
import io.ktor.client.HttpClient
import io.ktor.client.engine.cio.CIO
import io.ktor.client.plugins.defaultRequest
import io.ktor.client.plugins.sse.SSE
import io.ktor.client.plugins.sse.sse
import io.ktor.http.ContentType
import io.ktor.http.contentType
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.callbackFlow
import kotlinx.coroutines.flow.retryWhen
import org.springframework.stereotype.Component
import org.springframework.web.reactive.function.client.WebClient
import org.springframework.web.reactive.function.client.bodyToFlow

/**
* Provides a reactive [Flow] of [EventRequest]s by connecting to a remote Server-Sent Events (SSE) endpoint.
Expand All @@ -17,12 +27,12 @@ import org.springframework.web.reactive.function.client.bodyToFlow
* - Applying retry logic in case of transient failures, with support for logging retry attempts.
*
* @property properties The SSE client configuration including URI and retry strategy.
* @property webClientBuilder A builder for constructing the [WebClient] used to make SSE requests.
* @property objectMapper
*/
@Component
class EventFlowProvider(
private val properties: EventDispatcherClientConfiguration,
webClientBuilder: WebClient.Builder,
private val objectMapper: ObjectMapper,
) {
private companion object {
/**
Expand All @@ -31,26 +41,47 @@ class EventFlowProvider(
private val logger = logger()
}

private val webClient = webClientBuilder.baseUrl(properties.uri.toString()).build()
private val httpClient =
HttpClient(CIO) {
install(SSE)

defaultRequest {
url(properties.uri.toString())
contentType(ContentType.Application.Json)
}
}

/**
* Creates and returns a [Flow] of [EventRequest]s from the SSE endpoint.
*
* @return A [Flow] that emits [EventRequest]s received from the remote SSE endpoint.
*/
@Suppress("TooGenericExceptionCaught")
fun start(): Flow<EventRequest> =
webClient
.get()
.retrieve()
.bodyToFlow<EventRequest>()
.retryWhen(
properties.sseRetry.toRetryWhen(
onRetry = { cause, attempt, delayMillis ->
logger.warn(
"Retrying after failure: ${cause.message}. " +
"Attempt ${attempt + 1}. Waiting for ${delayMillis}ms",
)
},
),
)
callbackFlow {
try {
httpClient.sse("") {
incoming.collect {
val event = objectMapper.readValue(it.data, EventRequest::class.java)
trySend(event).isSuccess
}
}
} catch (e: Exception) {
logger.error(e) { "SSE connection failed: ${e.message}" }
close(e)
}

awaitClose {
logger.info("SSE flow closed")
httpClient.close()
}
}.retryWhen(
properties.sseRetry.toRetryWhen(
onRetry = { cause, attempt, delayMillis ->
logger.warn(cause) {
"Retrying after failure: ${cause.message}. Attempt ${attempt + 1}. Waiting for ${delayMillis}ms"
}
},
),
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package ch.srgssr.pillarbox.monitoring.event.config

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.FlowCollector
import reactor.util.retry.Retry
import reactor.util.retry.RetryBackoffSpec
import java.time.Duration

/**
Expand All @@ -18,18 +16,6 @@ data class RetryProperties(
val initialInterval: Duration = Duration.ofSeconds(5),
val maxInterval: Duration = Duration.ofMinutes(1),
) {
/**
* Creates a [RetryBackoffSpec] based on the retry properties.
* This specification defines the backoff strategy for retries,
* including the number of attempts and interval timings.
*
* @return A configured [RetryBackoffSpec] for use with Reactor's retry mechanism.
*/
fun create(): RetryBackoffSpec =
Retry
.backoff(maxAttempts, initialInterval)
.maxBackoff(maxInterval)

/**
* Creates a `retryWhen` callback function for Kotlin Flow.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,27 +1,30 @@
package ch.srgssr.pillarbox.monitoring.event.repository

import ch.srgssr.pillarbox.monitoring.event.model.EventRequest
import ch.srgssr.pillarbox.monitoring.io.onSuccess
import ch.srgssr.pillarbox.monitoring.io.throwOnNotSuccess
import ch.srgssr.pillarbox.monitoring.log.debug
import ch.srgssr.pillarbox.monitoring.log.error
import ch.srgssr.pillarbox.monitoring.log.logger
import com.fasterxml.jackson.databind.ObjectMapper
import kotlinx.coroutines.reactor.awaitSingleOrNull
import io.ktor.client.HttpClient
import io.ktor.client.call.body
import io.ktor.client.request.post
import io.ktor.client.request.setBody
import io.ktor.client.request.url
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.http.HttpHeaders
import org.springframework.http.MediaType
import org.springframework.stereotype.Component
import org.springframework.web.reactive.function.client.WebClient

/**
* Repository responsible for sending event data to OpenSearch using the bulk API.
*
* @property webClient The [WebClient] configured to connect to the OpenSearch instance.
* @property httpClient The [HttpClient] configured to connect to the OpenSearch instance.
* @property objectMapper Jackson's [ObjectMapper] used to serialize event objects.
*/
@Component
class EventRepository(
@param:Qualifier("openSearchWebClient")
private val webClient: WebClient,
@param:Qualifier("openSearchHttpClient")
private val httpClient: HttpClient,
private val objectMapper: ObjectMapper,
) {
private companion object {
Expand All @@ -43,15 +46,12 @@ class EventRepository(
* @param events A list of events to be indexed.
*/
suspend fun saveAll(events: List<EventRequest>) {
webClient
.post()
.uri("/_bulk")
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.bodyValue(toNDJson(events))
.retrieve()
.bodyToMono(String::class.java)
.awaitSingleOrNull()
?.let { responseStr ->
httpClient
.post {
url("/_bulk")
setBody(toNDJson(events))
}.onSuccess {
val responseStr = body<String>()
logger.debug { "Bulk response: $responseStr " }

val response = objectMapper.readTree(responseStr)
Expand All @@ -67,7 +67,7 @@ class EventRepository(
}
}
}
}
}.throwOnNotSuccess { "Connection error" }
}

/**
Expand Down
Loading
Loading