diff --git a/R/parallel-taskq.R b/R/parallel-taskq.R index 369b757b0..c718b0bb6 100644 --- a/R/parallel-taskq.R +++ b/R/parallel-taskq.R @@ -12,6 +12,7 @@ PROCESS_DONE <- 200L PROCESS_STARTED <- 201L PROCESS_MSG <- 301L +PROCESS_OUTPUT <- 302L PROCESS_EXITED <- 500L PROCESS_CRASHED <- 501L PROCESS_CLOSED <- 502L @@ -50,7 +51,9 @@ task_q <- R6::R6Class( state = "waiting", fun = I(list(fun)), args = I(list(args)), - worker = I(list(NULL)) + worker = I(list(NULL)), + path = args[[1]], + startup = I(list(NULL)) ) private$schedule() invisible(id) @@ -62,15 +65,49 @@ task_q <- R6::R6Class( if (x == Inf) -1 else as.integer(as.double(x, "secs") * 1000) } repeat { + pr <- vector(mode = "list", nrow(private$tasks)) topoll <- which(private$tasks$state == "running") - conns <- lapply( + pr[topoll] <- processx::poll( private$tasks$worker[topoll], - function(x) x$get_poll_connection() + as_ms(timeout) ) - pr <- processx::poll(conns, as_ms(timeout)) - ready <- topoll[pr == "ready"] - results <- lapply(ready, function(i) { - msg <- private$tasks$worker[[i]]$read() + results <- lapply(seq_along(pr), function(i) { + # nothing from this worker? + if (is.null(pr[[i]]) || all(pr[[i]] != "ready")) { + return() + } + + # there is a testthat message? + worker <- private$tasks$worker[[i]] + msg <- if (pr[[i]][["process"]] == "ready") { + worker$read() + } + + # there is an output message? + has_output <- pr[[i]][["output"]] == "ready" || + pr[[i]][["error"]] == "ready" + outmsg <- NULL + if (has_output) { + lns <- c(worker$read_output_lines(), worker$read_error_lines()) + inc <- paste0(worker$read_output(), worker$read_error()) + if (nchar(inc)) { + lns <- c(lns, strsplit(inc, "\n", fixed = TRUE)[[1]]) + } + # startup message? + if (is.na(private$tasks$path[i])) { + private$tasks$startup[[i]] <- c(private$tasks$startup[[i]], lns) + } else { + outmsg <- structure( + list( + code = PROCESS_OUTPUT, + message = lns, + path = private$tasks$path[i] + ), + class = "testthat_message" + ) + } + } + ## TODO: why can this be NULL? if (is.null(msg) || msg$code == PROCESS_MSG) { private$tasks$state[[i]] <- "running" @@ -97,9 +134,10 @@ task_q <- R6::R6Class( class = c("testthat_process_error", "testthat_error") ) } - msg + compact(list(msg, outmsg)) }) - results <- results[!map_lgl(results, is.null)] + # single list for all workers + results <- compact(unlist(results, recursive = FALSE)) private$schedule() if (is.finite(timeout)) { @@ -129,9 +167,11 @@ task_q <- R6::R6Class( state = "running", fun = nl, args = nl, - worker = nl + worker = nl, + path = NA_character_, + startup = nl ) - rsopts <- callr::r_session_options(...) + rsopts <- callr::r_session_options(stdout = "|", stderr = "|", ...) for (i in seq_len(concurrency)) { rs <- callr::r_session$new(rsopts, wait = FALSE) private$tasks$worker[[i]] <- rs @@ -173,7 +213,10 @@ task_q <- R6::R6Class( file <- private$tasks$args[[task_no]][[1]] if (is.null(fun)) { msg$error$stdout <- msg$stdout - msg$error$stderr <- msg$stderr + msg$error$stderr <- paste( + c(private$tasks$startup[[task_no]], msg$stderr), + collapse = "\n" + ) abort( paste0( "testthat subprocess failed to start, stderr:\n", diff --git a/R/parallel.R b/R/parallel.R index 57221100f..0445e50a9 100644 --- a/R/parallel.R +++ b/R/parallel.R @@ -143,6 +143,12 @@ parallel_event_loop_smooth <- function(queue, reporters, test_dir) { updated <- FALSE for (x in msgs) { + if (x$code == PROCESS_OUTPUT) { + lns <- paste0("> ", x$path, ": ", x$message) + cat("\n", file = stdout()) + base::writeLines(lns, stdout()) + next + } if (x$code != PROCESS_MSG) { next } @@ -178,6 +184,11 @@ parallel_event_loop_chunky <- function(queue, reporters, test_dir) { while (!queue$is_idle()) { msgs <- queue$poll(Inf) for (x in msgs) { + if (x$code == PROCESS_OUTPUT) { + lns <- paste0("> ", x$path, ": ", x$message) + base::writeLines(lns, stdout()) + next + } if (x$code != PROCESS_MSG) { next } diff --git a/tests/testthat/test-parallel-stdout.R b/tests/testthat/test-parallel-stdout.R new file mode 100644 index 000000000..77ac0fa3d --- /dev/null +++ b/tests/testthat/test-parallel-stdout.R @@ -0,0 +1,17 @@ +test_that("stdout/stderr in parallel code", { + skip_on_covr() + withr::local_envvar(TESTTHAT_PARALLEL = "TRUE") + out <- capture.output(suppressMessages(testthat::test_local( + test_path("test-parallel", "stdout"), + reporter = "summary" + ))) + expect_true("> test-stdout-2.R: This is a message!" %in% out) + expect_true(any(grepl("test-stdout-3.R: [1] 1 2 3", out, fixed = TRUE))) + + out2 <- capture.output(suppressMessages(testthat::test_local( + test_path("test-parallel", "stdout"), + reporter = "progress" + ))) + expect_true("> test-stdout-2.R: This is a message!" %in% out2) + expect_true(any(grepl("test-stdout-3.R: [1] 1 2 3", out2, fixed = TRUE))) +}) diff --git a/tests/testthat/test-parallel/stdout/DESCRIPTION b/tests/testthat/test-parallel/stdout/DESCRIPTION new file mode 100644 index 000000000..95f688e6e --- /dev/null +++ b/tests/testthat/test-parallel/stdout/DESCRIPTION @@ -0,0 +1,20 @@ +Package: setup +Title: What the Package Does (One Line, Title Case) +Version: 0.0.0.9000 +Authors@R: + person(given = "First", + family = "Last", + role = c("aut", "cre"), + email = "first.last@example.com", + comment = c(ORCID = "YOUR-ORCID-ID")) +Description: What the package does (one paragraph). +License: `use_mit_license()`, `use_gpl3_license()` or friends to + pick a license +Encoding: UTF-8 +LazyData: true +Roxygen: list(markdown = TRUE) +RoxygenNote: 7.1.1 +Suggests: + testthat +Config/testthat/parallel: true +Config/testthat/edition: 3 diff --git a/tests/testthat/test-parallel/stdout/NAMESPACE b/tests/testthat/test-parallel/stdout/NAMESPACE new file mode 100644 index 000000000..6ae926839 --- /dev/null +++ b/tests/testthat/test-parallel/stdout/NAMESPACE @@ -0,0 +1,2 @@ +# Generated by roxygen2: do not edit by hand + diff --git a/tests/testthat/test-parallel/stdout/tests/testthat.R b/tests/testthat/test-parallel/stdout/tests/testthat.R new file mode 100644 index 000000000..abb06112c --- /dev/null +++ b/tests/testthat/test-parallel/stdout/tests/testthat.R @@ -0,0 +1,4 @@ +library(testthat) +library(ok) + +test_check("ok") diff --git a/tests/testthat/test-parallel/stdout/tests/testthat/test-stdout-1.R b/tests/testthat/test-parallel/stdout/tests/testthat/test-stdout-1.R new file mode 100644 index 000000000..c051c2dfc --- /dev/null +++ b/tests/testthat/test-parallel/stdout/tests/testthat/test-stdout-1.R @@ -0,0 +1,3 @@ +test_that("this is good", { + expect_equal(2 * 2, 4) +}) diff --git a/tests/testthat/test-parallel/stdout/tests/testthat/test-stdout-2.R b/tests/testthat/test-parallel/stdout/tests/testthat/test-stdout-2.R new file mode 100644 index 000000000..3096c244e --- /dev/null +++ b/tests/testthat/test-parallel/stdout/tests/testthat/test-stdout-2.R @@ -0,0 +1,4 @@ +test_that("this messages", { + message("This is a message!") + expect_true(TRUE) +}) diff --git a/tests/testthat/test-parallel/stdout/tests/testthat/test-stdout-3.R b/tests/testthat/test-parallel/stdout/tests/testthat/test-stdout-3.R new file mode 100644 index 000000000..a2ba80df2 --- /dev/null +++ b/tests/testthat/test-parallel/stdout/tests/testthat/test-stdout-3.R @@ -0,0 +1,4 @@ +test_that("this prints and skips", { + print(1:30) + skip(paste("This is", Sys.getpid())) +})