Skip to content

Commit 401dafb

Browse files
committed
feat: support user-scoped execution SSE endpoint
1 parent 7ce01d2 commit 401dafb

File tree

7 files changed

+67
-34
lines changed

7 files changed

+67
-34
lines changed

api/src/main/kotlin/me/snoty/backend/utils/SerializationUtils.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,9 @@ import kotlinx.serialization.json.Json
55
import kotlinx.serialization.serializer
66
import kotlin.reflect.KClass
77

8+
/**
9+
* A hacky way to encode an object to a JSON string using the serializer of its class.
10+
* This allows users to serialize objects whose classes are not known at compile time.
11+
*/
812
@OptIn(InternalSerializationApi::class)
913
fun <T : Any> Json.hackyEncodeToString(it: T) = encodeToString((it::class as KClass<T>).serializer(), it)

api/src/main/kotlin/me/snoty/backend/wiring/flow/execution/FlowExecutionEventService.kt

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,28 +7,33 @@ import me.snoty.backend.integration.config.flow.NodeId
77
import me.snoty.backend.scheduling.FlowTriggerReason
88
import me.snoty.integration.common.wiring.flow.FlowExecutionStatus
99
import me.snoty.integration.common.wiring.flow.NodeLogEntry
10+
import kotlin.uuid.Uuid
1011

1112
@Serializable
12-
sealed class FlowExecutionEvent(@Transient val eventType: String) {
13+
sealed class FlowExecutionEvent(val eventType: String) {
14+
abstract val userId: Uuid
1315
abstract val flowId: NodeId
1416
val timestamp = Clock.System.now()
1517

1618
@Serializable
1719
data class FlowStartedEvent(
20+
override val userId: Uuid,
1821
override val flowId: String,
1922
val jobId: String,
2023
val triggeredBy: FlowTriggerReason,
2124
) : FlowExecutionEvent("FlowStarted")
2225

2326
@Serializable
2427
data class FlowLogEvent(
28+
override val userId: Uuid,
2529
override val flowId: String,
2630
val jobId: String,
2731
val entry: NodeLogEntry,
2832
) : FlowExecutionEvent("FlowLog")
2933

3034
@Serializable
3135
data class FlowEndedEvent(
36+
override val userId: Uuid,
3237
override val flowId: String,
3338
val jobId: String,
3439
val status: FlowExecutionStatus,
@@ -42,4 +47,4 @@ interface FlowExecutionEventService {
4247
* Offers an event. Will be passed on using the database or in-memory channel.
4348
*/
4449
suspend fun offer(event: FlowExecutionEvent)
45-
}
50+
}

src/main/kotlin/me/snoty/backend/integration/flow/execution/FlowRunnerImpl.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ class FlowRunnerImpl(
5151
flowExecutionService.create(jobId, flow._id, triggeredBy)
5252

5353
flowExecutionEventService.offer(FlowExecutionEvent.FlowStartedEvent(
54+
userId = flow.userId,
5455
flowId = flow._id,
5556
jobId = jobId,
5657
triggeredBy = triggeredBy,
@@ -78,6 +79,7 @@ class FlowRunnerImpl(
7879
rootSpan.end()
7980
val status = if (it == null) FlowExecutionStatus.SUCCESS else FlowExecutionStatus.FAILED
8081
flowExecutionEventService.offer(FlowExecutionEvent.FlowEndedEvent(
82+
userId = flow.userId,
8183
flowId = flow._id,
8284
jobId = jobId,
8385
status = status,

src/main/kotlin/me/snoty/backend/integration/flow/execution/FlowTracing.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,11 @@ class FlowTracingImpl(
3232
val flowId = flow._id
3333

3434
val rootSpan = spanBuilder("Flow $flowId")
35+
.setAttribute(USER_ID, flow.userId)
3536
.setAttribute(JOB_ID, jobId)
3637
.setAttribute(FLOW_ID, flowId)
3738
.startSpan()
39+
KMDC.put(USER_ID, flow._id)
3840

3941
return rootSpan
4042
}
@@ -43,8 +45,6 @@ class FlowTracingImpl(
4345
setAttribute(NODE_ID, node._id)
4446
KMDC.put(NODE_ID, node._id)
4547
setAttribute("node.descriptor", node.descriptor)
46-
setAttribute(USER_ID, node.userId)
47-
KMDC.put(USER_ID, node.userId.toString())
4848

4949
if (featureFlags.traceConfig) {
5050
setAttribute("config", json.encodeToString(node.settings))

src/main/kotlin/me/snoty/backend/integration/flow/logging/NodeLogAppender.kt

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,8 @@ import kotlinx.coroutines.launch
99
import kotlinx.coroutines.newSingleThreadContext
1010
import kotlinx.datetime.Instant
1111
import me.snoty.backend.logging.toSLF4JLevel
12-
import me.snoty.backend.observability.APPENDER_LOG_LEVEL
13-
import me.snoty.backend.observability.FLOW_ID
14-
import me.snoty.backend.observability.JOB_ID
15-
import me.snoty.backend.observability.NODE_ID
12+
import me.snoty.backend.observability.*
13+
import me.snoty.backend.utils.toUuid
1614
import me.snoty.backend.wiring.flow.execution.FlowExecutionEvent
1715
import me.snoty.backend.wiring.flow.execution.FlowExecutionEventService
1816
import me.snoty.backend.wiring.flow.execution.FlowExecutionService
@@ -60,7 +58,8 @@ class NodeLogAppender(
6058
scope.launch {
6159
flowExecutionEventService.offer(FlowExecutionEvent.FlowLogEvent(
6260
jobId = jobId,
63-
flowId = eventObject.mdcPropertyMap[FLOW_ID.key] ?: return@launch logger.warn { "No flow ID found in log entry with msg='$message'" },
61+
userId = eventObject.mdcPropertyMap[USER_ID.key]?.toUuid() ?: return@launch logger.error { "No User ID found in log entry with msg='$message'" },
62+
flowId = eventObject.mdcPropertyMap[FLOW_ID.key] ?: return@launch logger.error { "No Flow ID found in log entry with msg='$message'" },
6463
entry = entry,
6564
))
6665

src/main/kotlin/me/snoty/backend/scheduling/jobrunr/node/JobRunrFlowJobHandler.kt

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package me.snoty.backend.scheduling.jobrunr.node
22

33
import kotlinx.coroutines.runBlocking
44
import kotlinx.coroutines.slf4j.MDCContext
5+
import kotlinx.coroutines.withContext
56
import me.snoty.backend.integration.flow.logging.NodeLogAppender
67
import me.snoty.backend.logging.KMDC
78
import me.snoty.backend.observability.APPENDER_LOG_LEVEL
@@ -50,16 +51,18 @@ class JobRunrFlowJobHandler(
5051

5152
KMDC.put(USER_ID, flow.userId.toString())
5253

53-
logger.debug("Processing flow {}", flow)
54+
withContext(MDCContext()) {
55+
logger.debug("Processing flow {}", flow)
5456

55-
flowRunner.execute(
56-
jobId = jobContext.jobId.toString(),
57-
triggeredBy = jobRequest.triggeredBy,
58-
logger = logger,
59-
logLevel = jobRequest.logLevel,
60-
flow = flow,
61-
input = SimpleIntermediateData(jobContext),
62-
)
57+
flowRunner.execute(
58+
jobId = jobContext.jobId.toString(),
59+
triggeredBy = jobRequest.triggeredBy,
60+
logger = logger,
61+
logLevel = jobRequest.logLevel,
62+
flow = flow,
63+
input = SimpleIntermediateData(jobContext),
64+
)
65+
}
6366
}
6467
}
6568
}

src/main/kotlin/me/snoty/backend/server/resources/wiring/flow/FlowExecutionResource.kt

Lines changed: 36 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,43 +6,63 @@ import io.ktor.server.sse.*
66
import io.ktor.sse.*
77
import kotlinx.coroutines.flow.filter
88
import kotlinx.serialization.json.Json
9+
import me.snoty.backend.utils.getUser
10+
import me.snoty.backend.utils.hackyEncodeToString
911
import me.snoty.backend.utils.orNull
1012
import me.snoty.backend.wiring.flow.execution.FlowExecutionEventService
1113
import me.snoty.backend.wiring.flow.execution.FlowExecutionService
1214
import org.koin.ktor.ext.get
1315
import kotlin.time.Duration.Companion.seconds
1416

15-
fun Route.flowExecutionResource() = route("{id}") {
17+
fun Route.flowExecutionResource() {
1618
val flowExecutionService: FlowExecutionService = get()
19+
val flowExecutionEventService: FlowExecutionEventService = get()
20+
val json: Json = get()
21+
22+
route("{id}") {
23+
get("executions") {
24+
val startFrom = call.request.queryParameters["startFrom"]?.orNull()
25+
val limit = call.request.queryParameters["limit"]?.toIntOrNull() ?: 10
26+
val flow = getPersonalFlowOrNull() ?: return@get
27+
val executions = flowExecutionService.query(flowId = flow._id, startFrom = startFrom, limit = limit)
1728

18-
get("executions") {
19-
val startFrom = call.request.queryParameters["startFrom"]?.orNull()
20-
val limit = call.request.queryParameters["limit"]?.toIntOrNull() ?: 10
21-
val flow = getPersonalFlowOrNull() ?: return@get
22-
val executions = flowExecutionService.query(flowId = flow._id, startFrom = startFrom, limit = limit)
29+
call.respond(executions)
30+
}
31+
32+
sse("executions/sse") {
33+
val flow = getPersonalFlowOrNull() ?: return@sse
2334

24-
call.respond(executions)
35+
heartbeat {
36+
period = 10.seconds
37+
event = ServerSentEvent("heartbeat")
38+
}
39+
val eventTypes = call.request.queryParameters["eventTypes"]?.split(",")?.map { it.trim() } ?: emptyList()
40+
41+
flowExecutionEventService.provideBus()
42+
.filter { it.flowId == flow._id }
43+
.filter { eventTypes.isEmpty() || it.eventType in eventTypes }
44+
.collect {
45+
send(
46+
data = json.encodeToString(it),
47+
event = it.eventType,
48+
)
49+
}
50+
}
2551
}
2652

27-
val flowExecutionEventService: FlowExecutionEventService = get()
28-
val json: Json = get()
29-
3053
sse("executions/sse") {
31-
val flow = getPersonalFlowOrNull() ?: return@sse
54+
val user = call.getUser()
3255

3356
heartbeat {
3457
period = 10.seconds
3558
event = ServerSentEvent("heartbeat")
3659
}
3760

38-
val eventTypes = call.request.queryParameters["eventTypes"]?.split(",")?.map { it.trim() } ?: emptyList()
39-
4061
flowExecutionEventService.provideBus()
41-
.filter { it.flowId == flow._id }
42-
.filter { eventTypes.isEmpty() || it.eventType in eventTypes }
62+
.filter { it.userId == user.id }
4363
.collect {
4464
send(
45-
data = json.encodeToString(it),
65+
data = json.hackyEncodeToString(it),
4666
event = it.eventType,
4767
)
4868
}

0 commit comments

Comments
 (0)