Skip to content

Commit d9c784e

Browse files
committed
fix(runner): transitive receive empty input not respected (#225)
Fixes #225
1 parent c31591e commit d9c784e

File tree

5 files changed

+116
-46
lines changed

5 files changed

+116
-46
lines changed

api/src/testFixtures/kotlin/me/snoty/backend/test/FlowHandlers.kt

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import me.snoty.backend.integration.config.flow.NodeId
44
import me.snoty.integration.common.wiring.Node
55
import me.snoty.integration.common.wiring.NodeHandleContext
66
import me.snoty.integration.common.wiring.data.NodeInput
7+
import me.snoty.integration.common.wiring.data.NodeOutput
78
import me.snoty.integration.common.wiring.data.mapInput
89
import me.snoty.integration.common.wiring.iterableStructOutput
910
import me.snoty.integration.common.wiring.node.NodeHandler
@@ -12,6 +13,8 @@ import me.snoty.integration.common.wiring.simpleOutput
1213
const val TYPE_MAP = "map"
1314
const val TYPE_QUOTE = "quote"
1415
const val TYPE_EXCEPTION = "exception"
16+
const val TYPE_WANTS_EMPTY_PROVIDES_NONEMPTY = "wants_empty_provides_nonempty"
17+
const val TYPE_WANTS_NONEMPTY_PROVIDES_EMPTY = "wants_nonempty_provides_empty"
1518

1619
abstract class TestNodeHandler : NodeHandler
1720

@@ -39,12 +42,36 @@ object ExceptionHandler : TestNodeHandler() {
3942
= throw exception
4043
}
4144

42-
class GlobalMapHandler(
43-
private val map: MutableMap<NodeId, Any> = mutableMapOf()
44-
) : TestNodeHandler(), Map<NodeId, Any> by map {
45-
override suspend fun NodeHandleContext.process(node: Node, input: NodeInput) = mapInput<Any>(input) {
46-
map[node._id] = it
45+
open class GlobalMapHandler(
46+
private val map: MutableMap<NodeId, NodeInput> = mutableMapOf()
47+
) : TestNodeHandler(), Map<NodeId, NodeInput> by map {
48+
override suspend fun NodeHandleContext.process(node: Node, input: NodeInput) = processImpl(this, node, input)
49+
50+
fun processImpl(nodeHandleContext: NodeHandleContext, node: Node, input: NodeInput): NodeOutput {
51+
map[node._id] = input
52+
53+
return nodeHandleContext.mapInput<Any>(input) {
54+
nodeHandleContext.simpleOutput(it)
55+
}
56+
}
57+
}
4758

48-
simpleOutput(it)
59+
class WantsEmptyProvidesNonEmptyHandler : GlobalMapHandler() {
60+
companion object {
61+
const val OUTPUT = "non-empty"
4962
}
63+
64+
override suspend fun NodeHandleContext.process(node: Node, input: NodeInput): NodeOutput {
65+
super.processImpl(this, node, input)
66+
return simpleOutput(OUTPUT)
67+
}
68+
}
69+
70+
class WantsNonEmptyProvidesEmptyHandler : GlobalMapHandler() {
71+
override suspend fun NodeHandleContext.process(node: Node, input: NodeInput) =
72+
if (input.isNotEmpty()) {
73+
super.processImpl(this, node, input)
74+
iterableStructOutput<Unit>(emptyList())
75+
}
76+
else error("Expected non-empty input, got: $input")
5077
}

api/src/testFixtures/kotlin/me/snoty/backend/test/TestObjects.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,12 +71,14 @@ fun nodeMetadata(
7171
descriptor: NodeDescriptor,
7272
position: NodePosition = NodePosition.MIDDLE,
7373
settingsClass: KClass<out NodeSettings> = EmptyNodeSettings::class,
74+
receiveEmptyInput: Boolean = false,
7475
) = NodeMetadata(
7576
descriptor = descriptor,
7677
displayName = "Test Node",
7778
position = position,
7879
settings = emptyList(),
7980
settingsClass = settingsClass,
81+
receiveEmptyInput = receiveEmptyInput,
8082
input = null,
8183
output = null
8284
)
@@ -86,11 +88,13 @@ fun nodeMetadata(
8688
namespace: String = "me.snoty.backend.test",
8789
position: NodePosition = NodePosition.MIDDLE,
8890
settingsClass: KClass<out NodeSettings> = EmptyNodeSettings::class,
91+
receiveEmptyInput: Boolean = false,
8992
) = nodeMetadata(
9093
NodeDescriptor(
9194
namespace = namespace,
9295
name = name
9396
),
9497
position,
9598
settingsClass,
99+
receiveEmptyInput,
96100
)

src/main/kotlin/me/snoty/backend/integration/flow/execution/FlowRunnerImpl.kt

Lines changed: 36 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,12 @@ import me.snoty.backend.wiring.flow.execution.FlowExecutionEventService
2424
import me.snoty.backend.wiring.flow.execution.FlowExecutionService
2525
import me.snoty.integration.common.model.NodePosition
2626
import me.snoty.integration.common.wiring.FlowNode
27+
import me.snoty.integration.common.wiring.Node
2728
import me.snoty.integration.common.wiring.NodeHandleContextImpl
2829
import me.snoty.integration.common.wiring.data.IntermediateData
2930
import me.snoty.integration.common.wiring.data.IntermediateDataMapperRegistry
3031
import me.snoty.integration.common.wiring.data.NodeInput
32+
import me.snoty.integration.common.wiring.data.NodeOutput
3133
import me.snoty.integration.common.wiring.flow.FlowExecutionStatus
3234
import me.snoty.integration.common.wiring.flow.FlowRunner
3335
import me.snoty.integration.common.wiring.flow.WorkflowWithNodes
@@ -152,19 +154,43 @@ class FlowRunnerImpl(
152154
visited: List<NodeId>,
153155
depth: Int,
154156
): Flow<Unit> {
155-
val handler = nodeRegistry.lookupHandler(node.descriptor)
156-
?: let {
157-
logger.error { "No handler found for node ${node.descriptor}" }
158-
return emptyFlow()
159-
}
160-
161157
if (node._id in visited) {
162158
val referencingNodes = visited
163159
.filter { nodeMap[it]?.next?.contains(node._id) == true }
164160
logger.error { "Cycle detected at node ${node.descriptor} (${node._id}, referenced by $referencingNodes)" }
165161
return emptyFlow()
166162
}
167163

164+
val nodeLogName = "${node.descriptor.name} node \"${node.settings.name}\" (${node._id})"
165+
166+
val handler = nodeRegistry.lookupHandler(node.descriptor)
167+
?: let {
168+
logger.error { "No handler found for $nodeLogName" }
169+
return emptyFlow()
170+
}
171+
172+
val metadata = nodeRegistry.getMetadata(node.descriptor)
173+
174+
fun Node.executeNextNodes(input: NodeOutput) = node.next
175+
.asFlow()
176+
.mapNotNull { nextNodeId ->
177+
nodeMap[nextNodeId] ?: let {
178+
logger.error { "Next node $nextNodeId not found" }
179+
null
180+
}
181+
}
182+
.flatMapConcat { nextNode ->
183+
val subspan = span.subspan(flowTracing, traceName(nextNode)) {
184+
setNodeAttributes(nextNode, input)
185+
}
186+
executeImpl(subspan, nextNode, input, visited + node._id, depth + 1)
187+
}
188+
189+
if (metadata.position != NodePosition.START && input.isEmpty() && !metadata.receiveEmptyInput) {
190+
logger.debug { "Skipping $nodeLogName because it does not receive empty input." }
191+
return node.executeNextNodes(emptyList()).onCompletion { span.end() }
192+
}
193+
168194
val context = NodeHandleContextImpl(
169195
intermediateDataMapperRegistry = intermediateDataMapperRegistry,
170196
logger = logger.underlyingLogger,
@@ -173,11 +199,11 @@ class FlowRunnerImpl(
173199
KMDC.put(APPENDER_LOG_LEVEL, (node.logLevel ?: logLevel).name)
174200

175201
return flow {
176-
logger.debug { "Processing ${node.descriptor.name} node \"${node.settings.name}\" (${node._id}) with $input" }
202+
logger.debug { "Processing $nodeLogName with $input" }
177203
// pls fix Kotlin
178204
val data = with(context) { with(handler) { process(node, input) } }
179-
logger.debug { "Processed ${node.descriptor.name} node \"${node.settings.name}\" (${node._id})" }
180-
if (nodeRegistry.getMetadata(node.descriptor).position.logOutput && node.next.isEmpty()) {
205+
logger.debug { "Processed $nodeLogName" }
206+
if (metadata.position.logOutput && node.next.isEmpty()) {
181207
logger.debug { "Node \"${node.settings.name}\" (${node._id}) has no output nodes, would have emitted $data" }
182208
}
183209

@@ -192,21 +218,7 @@ class FlowRunnerImpl(
192218
}
193219
.flowOn(span.asContextElement() + MDCContext())
194220
.flatMapConcat { output ->
195-
node.next
196-
.asFlow()
197-
.mapNotNull { nextNodeId ->
198-
nodeMap[nextNodeId] ?: let {
199-
logger.error { "Next node $nextNodeId not found" }
200-
null
201-
}
202-
}
203-
.filter { nextNode -> !output.isEmpty() || nodeRegistry.getMetadata(nextNode.descriptor).receiveEmptyInput }
204-
.flatMapConcat { nextNode ->
205-
val subspan = span.subspan(flowTracing, traceName(nextNode)) {
206-
setNodeAttributes(nextNode, output)
207-
}
208-
executeImpl(subspan, nextNode, output, visited + node._id, depth + 1)
209-
}
221+
node.executeNextNodes(output) // current Node's output is the next Node's input
210222
}
211223
.onCompletion {
212224
span.end()

src/test/kotlin/me/snoty/backend/integration/flow/FlowRunnerImplTest.kt

Lines changed: 42 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import me.snoty.backend.wiring.flow.execution.FlowExecutionEventService
2424
import me.snoty.backend.wiring.node.NodeRegistryImpl
2525
import me.snoty.integration.common.snotyJson
2626
import me.snoty.integration.common.wiring.data.IntermediateData
27+
import me.snoty.integration.common.wiring.data.NodeInput
2728
import me.snoty.integration.common.wiring.data.impl.SimpleIntermediateData
2829
import me.snoty.integration.common.wiring.flow.Workflow
2930
import me.snoty.integration.common.wiring.flow.WorkflowWithNodes
@@ -53,13 +54,17 @@ class FlowRunnerImplTest {
5354
}
5455
}
5556

56-
fun nodeMetadata(name: String) = nodeMetadata(namespace = namespace, name = name)
57+
fun nodeMetadata(name: String, receiveEmptyInput: Boolean) = nodeMetadata(namespace = namespace, name = name, receiveEmptyInput = receiveEmptyInput)
5758

5859
private val mapHandler = GlobalMapHandler()
60+
private val wantsEmptyProvidesNonEmptyHandler = WantsEmptyProvidesNonEmptyHandler()
61+
private val wantsNonEmptyProvidesEmptyHandler = WantsNonEmptyProvidesEmptyHandler()
5962
private val nodeRegistry = NodeRegistryImpl().apply {
60-
registerHandler(nodeMetadata(name = TYPE_MAP), mapHandler)
61-
registerHandler(nodeMetadata(name = TYPE_QUOTE), QuoteHandler)
62-
registerHandler(nodeMetadata(name = TYPE_EXCEPTION), ExceptionHandler)
63+
registerHandler(nodeMetadata(name = TYPE_MAP, false), mapHandler)
64+
registerHandler(nodeMetadata(name = TYPE_QUOTE, false), QuoteHandler)
65+
registerHandler(nodeMetadata(name = TYPE_EXCEPTION, false), ExceptionHandler)
66+
registerHandler(nodeMetadata(name = TYPE_WANTS_EMPTY_PROVIDES_NONEMPTY, true), wantsEmptyProvidesNonEmptyHandler)
67+
registerHandler(nodeMetadata(name = TYPE_WANTS_NONEMPTY_PROVIDES_EMPTY, false), wantsNonEmptyProvidesEmptyHandler)
6368
registerEmitHandler()
6469
}
6570
private val otel = createOpenTelemetry()
@@ -76,11 +81,11 @@ class FlowRunnerImplTest {
7681
appender.start()
7782
}
7883

79-
private suspend fun FlowRunnerImpl.executeStartNode(jobId: String, flow: WorkflowWithNodes, input: IntermediateData) {
84+
private suspend fun FlowRunnerImpl.executeStartNode(jobId: String, flow: WorkflowWithNodes, input: NodeInput) {
8085
// set in the scheduler
8186
KMDC.put(JOB_ID, jobId)
8287
withContext(MDCContext()) {
83-
execute(jobId, FlowTriggerReason.Unknown, logger, Level.DEBUG, flow, listOf(input))
88+
execute(jobId, FlowTriggerReason.Unknown, logger, Level.DEBUG, flow, input)
8489
}
8590
}
8691

@@ -108,9 +113,9 @@ class FlowRunnerImplTest {
108113
val emit = emitNode(node)
109114
val flow = relationalFlow(emit, node)
110115
val jobId = "basic"
111-
runner.executeStartNode(jobId, flow, intermediateData)
116+
runner.executeStartNode(jobId, flow, listOf(intermediateData))
112117
assertNoWarnings(flow)
113-
assertEquals(intermediateDataRaw, mapHandler[node._id])
118+
assertEquals(listOf(intermediateData), mapHandler[node._id])
114119
val spans = otel.spanExporter.finishedSpanItems
115120
.sortedBy { it.startEpochNanos }
116121

@@ -130,9 +135,9 @@ class FlowRunnerImplTest {
130135
val emit = emitNode(processor)
131136
val flow = relationalFlow(emit, processor, map)
132137

133-
runner.executeStartNode("basic withQuote", flow, intermediateData)
138+
runner.executeStartNode("basic withQuote", flow, listOf(intermediateData))
134139
assertNoWarnings(flow)
135-
assertEquals("'test'", mapHandler[map._id])
140+
assertEquals("'test'", mapHandler[map._id]?.single()?.value)
136141

137142
val spans = otel.spanExporter.finishedSpanItems
138143
assertEquals(4, spans.size)
@@ -149,9 +154,9 @@ class FlowRunnerImplTest {
149154
val flow = relationalFlow(emit, node)
150155

151156
suspend fun verifyTrace(flow: WorkflowWithNodes, input: IntermediateData, withConfig: Boolean) {
152-
runner.executeStartNode("traces config attribute", flow, input)
157+
runner.executeStartNode("traces config attribute", flow, listOf(input))
153158
assertNoWarnings(flow)
154-
assertEquals(input.value, mapHandler[node._id])
159+
assertEquals(input, mapHandler[node._id]?.single())
155160
val spans = otel.spanExporter.finishedSpanItems
156161
.sortedBy { it.startEpochNanos }
157162

@@ -185,7 +190,7 @@ class FlowRunnerImplTest {
185190
val flow = relationalFlow(emit, mapNode, exNode)
186191

187192
assertThrows<FlowExecutionException> {
188-
runner.executeStartNode("traces exception attributes", flow, intermediateData)
193+
runner.executeStartNode("traces exception attributes", flow, listOf(intermediateData))
189194
}
190195
assertNull(mapHandler[exNode._id])
191196
val spans = otel.spanExporter.finishedSpanItems
@@ -220,4 +225,28 @@ class FlowRunnerImplTest {
220225
assertEquals(FlowExecutionException::class.qualifiedName, rootExceptionEvent.attributes.get(ExceptionAttributes.EXCEPTION_TYPE))
221226
assertNotNull(rootExceptionEvent.attributes.get(ExceptionAttributes.EXCEPTION_MESSAGE))
222227
}
228+
229+
@Test
230+
fun `test receive empty input transitively - #225`() = runBlocking {
231+
val nodex = node(NodeDescriptor(namespace, TYPE_WANTS_EMPTY_PROVIDES_NONEMPTY))
232+
val nodex1 = node(NodeDescriptor(namespace, TYPE_QUOTE), next = listOf(nodex))
233+
val nodex2 = node(NodeDescriptor(namespace, TYPE_WANTS_EMPTY_PROVIDES_NONEMPTY), next = listOf(nodex1))
234+
val nodex3 = node(NodeDescriptor(namespace, TYPE_WANTS_NONEMPTY_PROVIDES_EMPTY), next = listOf(nodex2)) // also skipped
235+
val nodex4 = node(NodeDescriptor(namespace, TYPE_WANTS_NONEMPTY_PROVIDES_EMPTY), next = listOf(nodex3)) // skipped
236+
val emit = emitNode(nodex4) // won't emit anything
237+
238+
val flow = relationalFlow(emit, nodex4, nodex3, nodex2, nodex1, nodex)
239+
println(flow.nodes.joinToString("\n") { "${it._id}: ${it.descriptor.name}" })
240+
241+
runner.executeStartNode("receive empty input transitively", flow, emptyList())
242+
243+
assertNoWarnings(flow)
244+
assertFalse(nodex4._id in wantsNonEmptyProvidesEmptyHandler)
245+
assertFalse(nodex3._id in wantsNonEmptyProvidesEmptyHandler)
246+
assertEquals(emptyList<SimpleIntermediateData>(), wantsEmptyProvidesNonEmptyHandler[nodex2._id])
247+
assertEquals(
248+
listOf(SimpleIntermediateData("'${WantsEmptyProvidesNonEmptyHandler.OUTPUT}'")),
249+
wantsEmptyProvidesNonEmptyHandler[nodex._id]
250+
)
251+
}
223252
}

src/test/kotlin/me/snoty/backend/integration/flow/FlowTestUtils.kt

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import me.snoty.integration.common.wiring.flow.WorkflowWithNodes
1414
import me.snoty.integration.common.wiring.node.NodeDescriptor
1515
import me.snoty.integration.common.wiring.node.NodeHandler
1616
import me.snoty.integration.common.wiring.node.NodeRegistry
17-
import me.snoty.integration.common.wiring.simpleOutput
1817
import kotlin.uuid.Uuid
1918

2019
fun relationalFlow(
@@ -37,8 +36,7 @@ object EmitHandler : NodeHandler {
3736
NodePosition.START,
3837
)
3938

40-
override suspend fun NodeHandleContext.process(node: Node, input: NodeInput)
41-
= simpleOutput("test")
39+
override suspend fun NodeHandleContext.process(node: Node, input: NodeInput) = input
4240
}
4341
fun NodeRegistry.registerEmitHandler() {
4442
registerHandler(EmitHandler.metadata, EmitHandler)

0 commit comments

Comments
 (0)