Skip to content

Timeout not detected on newSingleThreadContext() when withTimeout wraps a blocking job #3875

@OliverO2

Description

@OliverO2
Contributor

If withTimeout wraps a blocking job, it does not detect a timeout in conjunction with newSingleThreadContext(). Timeout detection works as expected with other dispatcher flavors:

import kotlinx.coroutines.* // ktlint-disable no-wildcard-imports
import java.util.concurrent.Executors

@OptIn(ExperimentalCoroutinesApi::class, DelicateCoroutinesApi::class)
fun main(): Unit = runBlocking {
    listOf(
        Triple("newSingleThreadExecutor", { Executors.newSingleThreadExecutor().asCoroutineDispatcher() }, true),
        Triple("newSingleThreadContext", { newSingleThreadContext("single") }, true),
        Triple("newFixedThreadPoolContext(2)", { newFixedThreadPoolContext(2, "double") }, true),
        Triple("IO.limitedParallelism(1)", { Dispatchers.IO.limitedParallelism(1) }, false),
        Triple("IO.limitedParallelism(2)", { Dispatchers.IO.limitedParallelism(2) }, false)
    ).forEach { (name, dispatcher, needsClosing) ->
        print("$name: ")
        val dispatcher = dispatcher()
        try {
            withContext(dispatcher) {
                try {
                    withTimeout(1000) {
                        launch {
                            Thread.sleep(2000)
                            // delay(2000)
                        }
                    }
                    println("no timeout detected")
                } catch (t: Throwable) {
                    println("$t")
                }
            }
        } finally {
            if (needsClosing) (dispatcher as ExecutorCoroutineDispatcher).close()
        }
    }
}

produces:

newSingleThreadExecutor: kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1000 ms
newSingleThreadContext: no timeout detected
newFixedThreadPoolContext(2): kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1000 ms
IO.limitedParallelism(1): kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1000 ms
IO.limitedParallelism(2): kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1000 ms

Version: kotlinx-coroutines-core:1.7.3

Effects observed here first: kotest/kotest#3672

Activity

dkhalanskyjb

dkhalanskyjb commented on Sep 4, 2023

@dkhalanskyjb
Contributor

What are you trying to achieve? withTimeout does not interrupt blocking code as is, you need runInterruptible (https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/run-interruptible.html) for that. The only difference is whether, on a timeout, an exception will be thrown after the code finishes executing.

OliverO2

OliverO2 commented on Sep 4, 2023

@OliverO2
ContributorAuthor

That's of course all valid and has been noted in kotest/kotest#3672 (comment) and kotest/kotest#3672 (comment).

What the original test in question was trying to achieve can probably be best answered by @sschuberth.

The point here is that the timeout exception after the blocking section is usually available, but missing with one type of dispatcher. From a Kotest point of view, this change in behavior is considered a regression.

sschuberth

sschuberth commented on Sep 4, 2023

@sschuberth

What the original test in question was trying to achieve can probably be best answered by @sschuberth.

The code where this Kotest regression occurred is here, the test was written by @oheger-bosch. My interpretation is that the test should ensure that a call to fossId.scanPackage() blocks, and this is being tested by calling it withTimeout, expecting that we run into a TimeoutCancellationException.

OliverO2

OliverO2 commented on Sep 4, 2023

@OliverO2
ContributorAuthor

Came up with another scenario: What happens if the timeout just occurs during a cpu-bound period, even if the coroutine is non-blocking otherwise? Seems like in this case newSingleThreadContext() also misses the timeout:

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExecutorCoroutineDispatcher
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.newFixedThreadPoolContext
import kotlinx.coroutines.newSingleThreadContext
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
import kotlinx.coroutines.withTimeout
import kotlinx.datetime.Clock
import java.util.concurrent.Executors
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds

@OptIn(ExperimentalCoroutinesApi::class, DelicateCoroutinesApi::class)
fun main(): Unit = runBlocking {
    mapOf<String, CoroutineScope.() -> Unit>(
        "launched Thread.sleep" to {
            launch {
                Thread.sleep(100 /* ms */)
            }
        },
        "launched delay + CPU hog" to {
            launch {
                delay(20.milliseconds)
                hogCpu(100.milliseconds)
            }
        }
    ).forEach { (blockingVariantName, blockingBlock) ->
        println("\n--- $blockingVariantName ---")
        listOf(
            Triple("newSingleThreadExecutor", { Executors.newSingleThreadExecutor().asCoroutineDispatcher() }, true),
            Triple("newSingleThreadContext", { newSingleThreadContext("single") }, true),
            Triple("newFixedThreadPoolContext(2)", { newFixedThreadPoolContext(2, "double") }, true),
            Triple("IO.limitedParallelism(1)", { Dispatchers.IO.limitedParallelism(1) }, false),
            Triple("IO.limitedParallelism(2)", { Dispatchers.IO.limitedParallelism(2) }, false)
        ).forEach { (name, dispatcher, needsClosing) ->
            print("$name: ")
            val dispatcher = dispatcher()
            val startInstant = Clock.System.now()
            try {
                withContext(dispatcher) {
                    try {
                        withTimeout(50.milliseconds) {
                            blockingBlock()
                        }
                        println("no timeout detected")
                    } catch (t: Throwable) {
                        println("$t")
                    }
                }
            } finally {
                if (needsClosing) (dispatcher as ExecutorCoroutineDispatcher).close()
            }
            println("– elapsed time: ${Clock.System.now() - startInstant}")
        }
    }
}

fun hogCpu(duration: Duration) {
    var spinCount = 0
    val startInstant = Clock.System.now()
    while (Clock.System.now() - startInstant < duration) {
        spinCount++
    }
    // println("- spinCount=$spinCount")
}

produces

--- launched Thread.sleep ---
newSingleThreadExecutor: kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 50 ms
– elapsed time: 120.536ms
newSingleThreadContext: no timeout detected
– elapsed time: 105.131ms
newFixedThreadPoolContext(2): kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 50 ms
– elapsed time: 102.367ms
IO.limitedParallelism(1): kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 50 ms
– elapsed time: 108.744ms
IO.limitedParallelism(2): kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 50 ms
– elapsed time: 102.126ms

--- launched delay + CPU hog ---
newSingleThreadExecutor: kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 50 ms
– elapsed time: 133.593ms
newSingleThreadContext: no timeout detected
– elapsed time: 121.507ms
newFixedThreadPoolContext(2): kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 50 ms
– elapsed time: 121.256ms
IO.limitedParallelism(1): kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 50 ms
– elapsed time: 121.249ms
IO.limitedParallelism(2): kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 50 ms
– elapsed time: 121.699ms
dkhalanskyjb

dkhalanskyjb commented on Sep 4, 2023

@dkhalanskyjb
Contributor

My interpretation is that the test should ensure that a call to fossId.scanPackage() blocks, and this is being tested by calling it withTimeout, expecting that we run into a TimeoutCancellationException.

Suggestion:

val time = measureTime {
  // block taking a long time
}
assertTrue(time > 2.seconds)

This way, there's no illusion that the blocking code gets canceled.

What happens if the timeout just occurs during a cpu-bound period

In this case, you're supposed to call ensureActive periodically during your work. Otherwise, there's no way for withTimeout to cancel the block of code.

OliverO2

OliverO2 commented on Sep 4, 2023

@OliverO2
ContributorAuthor

In this case, you're supposed to call ensureActive periodically during your work. Otherwise, there's no way for withTimeout to cancel the block of code.

(In my own code, 'm actually using isActive and yield() in CPU-bound blocks.)

Regarding this case, as a library user, without looking into the actual kotlinx.coroutines code, my mental model is as follows:

  • withTimeout wraps a block like coroutineScope does.
  • At the end, it waits like joinAll() for coroutines launched in that block.
  • I'd expect that (at least conceptual) joinAll() to be the place where the cancellation would happen (and the timeout be detected for blocking jobs).

If that is conceptually correct: If a job, which is waited for, happens to be consuming the CPU for a very brief period at the end, and in exactly that period the timeout expires, doesn't that seem racy?

Note that I'm not expecting a cancellation to interrupt that (briefly) blocking job. Just to detect the timeout.

sschuberth

sschuberth commented on Sep 4, 2023

@sschuberth

Suggestion:

Thanks, but how you avoid the test to really take "a long time" then? The test case should complete as quickly as possible.

OliverO2

OliverO2 commented on Sep 4, 2023

@OliverO2
ContributorAuthor

Thanks, but how you avoid the test to really take "a long time" then? The test case should complete as quickly as possible.

In this case you should use runInterruptible or the corresponding Kotest blocking/timeout configuration.

sschuberth

sschuberth commented on Sep 4, 2023

@sschuberth

or the corresponding Kotest blocking/timeout configuration.

I just looked at the implementation for Kotest's shouldTimeout, and it turns out it does the same thing that we did manually, see

https://github.com/kotest/kotest/blob/402597ca4bf581f10df2f3d062a2427e0de2d005/kotest-assertions/kotest-assertions-shared/src/jvmMain/kotlin/io/kotest/assertions/async/timeout.kt#L18-L26

So I guess this needs some fixing as well?

dkhalanskyjb

dkhalanskyjb commented on Sep 4, 2023

@dkhalanskyjb
Contributor

If it's for running suspending code, it seems perfectly fine. The problem is only about code that doesn't participate in cooperative cancellation.

6 remaining items

Loading
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

      Development

      No branches or pull requests

        Participants

        @sschuberth@qwwdfsad@OliverO2@dkhalanskyjb

        Issue actions

          Timeout not detected on newSingleThreadContext() when withTimeout wraps a blocking job · Issue #3875 · Kotlin/kotlinx.coroutines