Skip to content

Conversation

@ClemDoum
Copy link

@ClemDoum ClemDoum commented Oct 31, 2025

Addresses ICIJ/datashare#1913

Spike description

Dynamically batched task workflow

The goal of this spike is to validate that Kestra supports running workflows the processing of an input of unknown size can be split into a subtasks of fixed size.

Implementation

In Kestra splitting an input of arbitrary task is supported through the ForEachItem task and subflows (instead of subtasks).
It's also possible to do the same without subflow but it's not recommended for task which spawns a lot of subtasks.

How to replicate

  • run kestra standalone server
  • paste the workflow definitions (dynamic_batch_flow.yml and dummy_processing.yml)
  • run the dynamic_batch_flow

Outcome

Positive:

  • Kestra is great an very helpful to create flows (schema validation is super helpful)
  • flow output debugging is also very helpful

Overall the spike is a success

@ClemDoum ClemDoum changed the title Spike(core)/dynamic batch dag spike(core)/dynamic batch dag Oct 31, 2025
@github-actions
Copy link

Tests report quick summary:

failed ❌ > tests: 3115, success: 3103, skipped: 11, failed: 1

Project Status Success Skipped Failed
cli success ✅ 74 0 0
core success ✅ 1626 0 0
executor success ✅ 3 0 0
jdbc success ✅ 10 0 0
jdbc-h2 success ✅ 435 0 0
jdbc-mysql success ✅ 437 0 0
jdbc-postgres failed ❌ 436 0 1
processor success ✅ 7 0 0
runner-memory success ✅ 1 0 0
scheduler success ✅ 23 0 0
storage-local success ✅ 62 0 0

Failed tests:

jdbc-postgres > io.kestra.runner.postgres.PostgresServiceLivenessCoordinatorTest > shouldReEmitTasksToTheSameWorkerGroup() failed ❌ in 10.198
org.opentest4j.AssertionFailedError: shouldReEmitTasksToTheSameWorkerGroup: resubmitLatchAwait was not OK, workerTaskResultQueue content: [{"taskRun":{"id":"38CmxMyg4oNH1pE4iWdloE","executionId":"3Q7rZik7tcOYfHsJ7asKTJ","namespace":"io.kestra.unit-test","flowId":"CrwiXhsxl89kra9FaGIct","taskId":"unit-test","attempts":[{"state":{"current":"RUNNING","histories":[{"state":"CREATED","date":"2025-10-31T17:17:51.478258Z"},{"state":"RUNNING","date":"2025-10-31T17:17:51.478259Z"}],"duration":0.000001000,"startDate":"2025-10-31T17:17:51.478258Z"},"workerId":"1yOE3wnAm182J5NCQ4yoQX"}],"outputs":null,"state":{"current":"RUNNING","histories":[{"state":"CREATED","date":"2025-10-31T17:17:51.458092Z"},{"state":"RUNNING","date":"2025-10-31T17:17:51.475001Z"}],"duration":0.016909000,"startDate":"2025-10-31T17:17:51.458092Z"}},"dynamicTaskRuns":[]},{"taskRun":{"id":"38CmxMyg4oNH1pE4iWdloE","executionId":"3Q7rZik7tcOYfHsJ7asKTJ","namespace":"io.kestra.unit-test","flowId":"CrwiXhsxl89kra9FaGIct","taskId":"unit-test","attempts":[{"state":{"current":"FAILED","histories":[{"state":"CREATED","date":"2025-10-31T17:17:51.478258Z"},{"state":"RUNNING","date":"2025-10-31T17:17:51.478259Z"},{"state":"FAILED","date":"2025-10-31T17:17:51.520753Z"}],"duration":0.042495000,"startDate":"2025-10-31T17:17:51.478258Z","endDate":"2025-10-31T17:17:51.520753Z"},"workerId":"1yOE3wnAm182J5NCQ4yoQX"}],"outputs":{},"state":{"current":"FAILED","histories":[{"state":"CREATED","date":"2025-10-31T17:17:51.458092Z"},{"state":"RUNNING","date":"2025-10-31T17:17:51.475001Z"},{"state":"FAILED","date":"2025-10-31T17:17:51.520794Z"}],"duration":0.062702000,"startDate":"2025-10-31T17:17:51.458092Z","endDate":"2025-10-31T17:17:51.520794Z"}},"dynamicTaskRuns":[]}]

org.opentest4j.AssertionFailedError: shouldReEmitTasksToTheSameWorkerGroup: resubmitLatchAwait was not OK, workerTaskResultQueue content: [{"taskRun":{"id":"38CmxMyg4oNH1pE4iWdloE","executionId":"3Q7rZik7tcOYfHsJ7asKTJ","namespace":"io.kestra.unit-test","flowId":"CrwiXhsxl89kra9FaGIct","taskId":"unit-test","attempts":[{"state":{"current":"RUNNING","histories":[{"state":"CREATED","date":"2025-10-31T17:17:51.478258Z"},{"state":"RUNNING","date":"2025-10-31T17:17:51.478259Z"}],"duration":0.000001000,"startDate":"2025-10-31T17:17:51.478258Z"},"workerId":"1yOE3wnAm182J5NCQ4yoQX"}],"outputs":null,"state":{"current":"RUNNING","histories":[{"state":"CREATED","date":"2025-10-31T17:17:51.458092Z"},{"state":"RUNNING","date":"2025-10-31T17:17:51.475001Z"}],"duration":0.016909000,"startDate":"2025-10-31T17:17:51.458092Z"}},"dynamicTaskRuns":[]},{"taskRun":{"id":"38CmxMyg4oNH1pE4iWdloE","executionId":"3Q7rZik7tcOYfHsJ7asKTJ","namespace":"io.kestra.unit-test","flowId":"CrwiXhsxl89kra9FaGIct","taskId":"unit-test","attempts":[{"state":{"current":"FAILED","histories":[{"state":"CREATED","date":"2025-10-31T17:17:51.478258Z"},{"state":"RUNNING","date":"2025-10-31T17:17:51.478259Z"},{"state":"FAILED","date":"2025-10-31T17:17:51.520753Z"}],"duration":0.042495000,"startDate":"2025-10-31T17:17:51.478258Z","endDate":"2025-10-31T17:17:51.520753Z"},"workerId":"1yOE3wnAm182J5NCQ4yoQX"}],"outputs":{},"state":{"current":"FAILED","histories":[{"state":"CREATED","date":"2025-10-31T17:17:51.458092Z"},{"state":"RUNNING","date":"2025-10-31T17:17:51.475001Z"},{"state":"FAILED","date":"2025-10-31T17:17:51.520794Z"}],"duration":0.062702000,"startDate":"2025-10-31T17:17:51.458092Z","endDate":"2025-10-31T17:17:51.520794Z"}},"dynamicTaskRuns":[]}]
	at app//io.kestra.jdbc.runner.JdbcServiceLivenessCoordinatorTest.shouldReEmitTasksToTheSameWorkerGroup(JdbcServiceLivenessCoordinatorTest.java:170)
	at [email protected]/java.lang.reflect.Method.invoke(Method.java:580)
	at app//io.micronaut.test.extensions.junit5.MicronautJunit5Extension$2.proceed(MicronautJunit5Extension.java:142)
	at app//io.micronaut.test.extensions.AbstractMicronautExtension.interceptEach(AbstractMicronautExtension.java:162)
	at app//io.micronaut.test.extensions.AbstractMicronautExtension.interceptTest(AbstractMicronautExtension.java:119)
	at app//io.micronaut.test.extensions.junit5.MicronautJunit5Extension.interceptTestMethod(MicronautJunit5Extension.java:129)
	at [email protected]/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:387)
	at [email protected]/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1312)
	at [email protected]/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1843)
	at [email protected]/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1808)
	at [email protected]/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:188)


Flaky tests report quick summary:

failed ❌ > tests: 21, success: 17, skipped: 2, failed: 2

Project Status Success Skipped Failed
cli success ✅ 2 0 0
core success ✅ 5 0 0
jdbc success ✅ 2 0 0
jdbc-h2 success ✅ 2 0 0
jdbc-mysql failed ❌ 3 0 1
jdbc-postgres failed ❌ 3 0 1
scheduler success ✅ 2 0 0

Failed tests:

jdbc-mysql > io.kestra.runner.mysql.MysqlServiceLivenessCoordinatorTest > shouldReEmitTasksWhenWorkerIsDetectedAsNonResponding() failed ❌ in 12.124
org.opentest4j.AssertionFailedError: 
Expecting value to be true but was false

org.opentest4j.AssertionFailedError: 
Expecting value to be true but was false
	at app//io.kestra.jdbc.runner.JdbcServiceLivenessCoordinatorTest.shouldReEmitTasksWhenWorkerIsDetectedAsNonResponding(JdbcServiceLivenessCoordinatorTest.java:129)
	at [email protected]/java.lang.reflect.Method.invoke(Method.java:580)
	at app//io.micronaut.test.extensions.junit5.MicronautJunit5Extension$2.proceed(MicronautJunit5Extension.java:142)
	at app//io.micronaut.test.extensions.AbstractMicronautExtension.interceptEach(AbstractMicronautExtension.java:162)
	at app//io.micronaut.test.extensions.AbstractMicronautExtension.interceptTest(AbstractMicronautExtension.java:119)
	at app//io.micronaut.test.extensions.junit5.MicronautJunit5Extension.interceptTestMethod(MicronautJunit5Extension.java:129)
	at [email protected]/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:387)
	at [email protected]/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1312)
	at [email protected]/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1843)
	at [email protected]/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1808)
	at [email protected]/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:188)

jdbc-postgres > io.kestra.schedulers.postgres.PostgresSchedulerScheduleTest > recoverNONELongRunningExecution() failed ❌ in 34.658
java.util.concurrent.TimeoutException: Await failed to terminate within PT20S.

java.util.concurrent.TimeoutException: Await failed to terminate within PT20S.
	at io.kestra.core.utils.Await.until(Await.java:42)
	at io.kestra.core.utils.Await.until(Await.java:31)
	at io.kestra.scheduler.SchedulerScheduleTest.recoverNONELongRunningExecution(SchedulerScheduleTest.java:687)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at io.micronaut.test.extensions.junit5.MicronautJunit5Extension$2.proceed(MicronautJunit5Extension.java:142)
	at io.micronaut.test.extensions.AbstractMicronautExtension.interceptEach(AbstractMicronautExtension.java:162)
	at io.micronaut.test.extensions.AbstractMicronautExtension.interceptTest(AbstractMicronautExtension.java:119)
	at io.micronaut.test.extensions.junit5.MicronautJunit5Extension.interceptTestMethod(MicronautJunit5Extension.java:129)
	at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:387)
	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1312)
	at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1843)
	at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1808)
	at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:188)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants