Skip to content

CoroutineStart.UNDISPATCHED flaky with collectLatest #4383

@vRallev

Description

@vRallev

This test due to the flaky code:

@Test
fun `CoroutineStart_UNDISPATCHED is flaky when using collectLatest`() = repeat(10) {
    runBlocking(Dispatchers.IO) {
        println("Iteration $it")

        val flow = MutableSharedFlow<Int>(extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
        var valueReceived = false

        val job = launch(start = CoroutineStart.UNDISPATCHED) {
            flow.collectLatest {
                valueReceived = true
            }
        }

        check(flow.tryEmit(1))

        withTimeout(2_000L) {
            while (!valueReceived) {
                delay(20L)
            }
        }

        job.cancel()
    }
}

The goal is to launch a job and start collecting a flow before the next instructions happen. In the sample it so happens when tryEmit is called that there sometimes is a subscriber and sometimes there isn't. This behavior is surprising and unintuitive.

I found multiple workarounds to fix that, but ideally the sample would work fine:

  1. Use collect instead of collectLatest. With that the test is green, but in my production code I need the behavior of collectLatest.
  2. Use replay = 1 for the shared flow. Then the missed value on the flow would be replayed. But this isn't ideal for more values or if there other subscribers that don't want the replay behavior.
  3. Use Dispatchers.Unconfined when launching the job. This is a significant change with other side effects.
  4. Add a delay before emitting the value.

Is this behavior expected? What is the recommendation for this use case?

Activity

dkhalanskyjb

dkhalanskyjb commented on Apr 23, 2025

@dkhalanskyjb
Collaborator

First of all, yes, this is expected: collectLatest launches a separate coroutine in which it collects the upstream values, and at the moment of tryEmit, that internally launched coroutine may or may not have started. On a single-threaded dispatcher, as you can see by removing Dispatchers.IO from the runBlocking arguments, it will consistently not have started by the time tryEmit is called.

That said, I do believe we should change this behavior. In this particular scenario where the upstream is executed in the same coroutine context as the downstream, there is no danger in immediately entering the internal coroutine and starting the collection, and the use case of reliably subscribing to a shared flow before starting emissions is valuable.

self-assigned this
on Jul 7, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

    Development

    No branches or pull requests

      Participants

      @vRallev@murfel@dkhalanskyjb

      Issue actions

        `CoroutineStart.UNDISPATCHED` flaky with `collectLatest` · Issue #4383 · Kotlin/kotlinx.coroutines