From 1a01e1ad2dda25064affbc7bf9c41515edf74d64 Mon Sep 17 00:00:00 2001 From: Hadley Wickham Date: Sun, 3 Aug 2025 19:51:05 -0500 Subject: [PATCH 1/7] =?UTF-8?q?=E2=9C=A8=20Forward=20stdout/stderr=20from?= =?UTF-8?q?=20parallel=20tests=20=E2=9C=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit I guess the question here is do we want to forward as we update the number of tests, or only at the end when we report the test failures etc. I can see arguments for either Fixes #2095 --- .claude/settings.local.json | 7 ++++--- R/parallel-taskq.R | 22 ++++++++++++++++++++-- tests/testthat/test-reporter-silent.R | 2 ++ 3 files changed, 26 insertions(+), 5 deletions(-) diff --git a/.claude/settings.local.json b/.claude/settings.local.json index 004f6006a..89cb56882 100644 --- a/.claude/settings.local.json +++ b/.claude/settings.local.json @@ -1,10 +1,11 @@ { + "$schema": "https://json.schemastore.org/claude-code-settings.json", "permissions": { "allow": [ "Bash(find:*)", - "Bash(R:*)" + "Bash(R:*)", + "Bash(air format:*)" ], "deny": [] - }, - "$schema": "https://json.schemastore.org/claude-code-settings.json" + } } \ No newline at end of file diff --git a/R/parallel-taskq.R b/R/parallel-taskq.R index 369b757b0..431120e68 100644 --- a/R/parallel-taskq.R +++ b/R/parallel-taskq.R @@ -70,7 +70,25 @@ task_q <- R6::R6Class( pr <- processx::poll(conns, as_ms(timeout)) ready <- topoll[pr == "ready"] results <- lapply(ready, function(i) { - msg <- private$tasks$worker[[i]]$read() + worker <- private$tasks$worker[[i]] + msg <- worker$read() + + # Also read any available stdout/stderr + stdout_lines <- worker$read_output_lines() + stderr_lines <- worker$read_error_lines() + + # Forward stdout/stderr to console + if (length(stdout_lines) > 0) { + for (line in stdout_lines) { + cat(line, "\n", file = stdout()) + } + } + if (length(stderr_lines) > 0) { + for (line in stderr_lines) { + cat(line, "\n", file = stderr()) + } + } + ## TODO: why can this be NULL? if (is.null(msg) || msg$code == PROCESS_MSG) { private$tasks$state[[i]] <- "running" @@ -131,7 +149,7 @@ task_q <- R6::R6Class( args = nl, worker = nl ) - rsopts <- callr::r_session_options(...) + rsopts <- callr::r_session_options(stdout = "|", stderr = "|", ...) for (i in seq_len(concurrency)) { rs <- callr::r_session$new(rsopts, wait = FALSE) private$tasks$worker[[i]] <- rs diff --git a/tests/testthat/test-reporter-silent.R b/tests/testthat/test-reporter-silent.R index bbfee4079..f7838b1e5 100644 --- a/tests/testthat/test-reporter-silent.R +++ b/tests/testthat/test-reporter-silent.R @@ -1,3 +1,5 @@ +cat("\nHello?\n") + test_that("captures expectations; doesn't produce any output", { reporter <- SilentReporter$new() expect_snapshot_reporter(reporter) From 95a1fd46ba6cbd110af9319551befab726f1f7b4 Mon Sep 17 00:00:00 2001 From: Hadley Wickham Date: Mon, 4 Aug 2025 07:50:37 -0500 Subject: [PATCH 2/7] Remove test code --- tests/testthat/test-reporter-silent.R | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/testthat/test-reporter-silent.R b/tests/testthat/test-reporter-silent.R index f7838b1e5..bbfee4079 100644 --- a/tests/testthat/test-reporter-silent.R +++ b/tests/testthat/test-reporter-silent.R @@ -1,5 +1,3 @@ -cat("\nHello?\n") - test_that("captures expectations; doesn't produce any output", { reporter <- SilentReporter$new() expect_snapshot_reporter(reporter) From 43e5b8065a98c33082a75df1a3e7f4b38252adc0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A1bor=20Cs=C3=A1rdi?= Date: Mon, 4 Aug 2025 17:35:46 +0200 Subject: [PATCH 3/7] Improve printing stdout/stderr from parallel workers --- R/parallel-taskq.R | 58 +++++++++++++++++++++------------- R/parallel.R | 11 ++++++- tests/testthat/test-bare.R | 5 +++ tests/testthat/test-describe.R | 1 + 4 files changed, 52 insertions(+), 23 deletions(-) diff --git a/R/parallel-taskq.R b/R/parallel-taskq.R index 431120e68..cd9694c88 100644 --- a/R/parallel-taskq.R +++ b/R/parallel-taskq.R @@ -12,6 +12,7 @@ PROCESS_DONE <- 200L PROCESS_STARTED <- 201L PROCESS_MSG <- 301L +PROCESS_OUTPUT <- 302L PROCESS_EXITED <- 500L PROCESS_CRASHED <- 501L PROCESS_CLOSED <- 502L @@ -50,7 +51,8 @@ task_q <- R6::R6Class( state = "waiting", fun = I(list(fun)), args = I(list(args)), - worker = I(list(NULL)) + worker = I(list(NULL)), + path = args[[1]] ) private$schedule() invisible(id) @@ -62,33 +64,40 @@ task_q <- R6::R6Class( if (x == Inf) -1 else as.integer(as.double(x, "secs") * 1000) } repeat { + pr <- vector(mode = "list", nrow(private$tasks)) topoll <- which(private$tasks$state == "running") - conns <- lapply( + pr[topoll] <- processx::poll( private$tasks$worker[topoll], - function(x) x$get_poll_connection() + as_ms(timeout) ) - pr <- processx::poll(conns, as_ms(timeout)) - ready <- topoll[pr == "ready"] - results <- lapply(ready, function(i) { + results <- lapply(seq_along(pr), function(i) { + if (is.null(pr[[i]]) || all(pr[[i]] != "ready")) { + return() + } worker <- private$tasks$worker[[i]] - msg <- worker$read() - - # Also read any available stdout/stderr - stdout_lines <- worker$read_output_lines() - stderr_lines <- worker$read_error_lines() - - # Forward stdout/stderr to console - if (length(stdout_lines) > 0) { - for (line in stdout_lines) { - cat(line, "\n", file = stdout()) + msgs <- NULL + if (pr[[i]][["output"]] == "ready" || pr[[i]][["error"]] == "ready") { + lns <- c(worker$read_output_lines(), worker$read_error_lines()) + inc <- paste0(worker$read_output(), worker$read_error()) + if (nchar(inc)) { + lns <- c(lns, strsplit(inc, "\n", fixed = TRUE)[[1]]) } + msg <- structure( + list( + code = PROCESS_OUTPUT, + message = lns, + path = private$tasks$path[i] + ), + class = "testthat_message" + ) + msgs <- list(msg) } - if (length(stderr_lines) > 0) { - for (line in stderr_lines) { - cat(line, "\n", file = stderr()) - } + if (pr[[i]][["process"]] != "ready") { + return(msgs) } + pri <- msg <- worker$read() + ## TODO: why can this be NULL? if (is.null(msg) || msg$code == PROCESS_MSG) { private$tasks$state[[i]] <- "running" @@ -115,9 +124,13 @@ task_q <- R6::R6Class( class = c("testthat_process_error", "testthat_error") ) } - msg + if (!is.null(msg)) { + msgs <- c(msgs, list(msg)) + } + msgs }) results <- results[!map_lgl(results, is.null)] + results <- unlist(results, recursive = FALSE) private$schedule() if (is.finite(timeout)) { @@ -147,7 +160,8 @@ task_q <- R6::R6Class( state = "running", fun = nl, args = nl, - worker = nl + worker = nl, + path = NA_character_ ) rsopts <- callr::r_session_options(stdout = "|", stderr = "|", ...) for (i in seq_len(concurrency)) { diff --git a/R/parallel.R b/R/parallel.R index 57221100f..9daee7b9b 100644 --- a/R/parallel.R +++ b/R/parallel.R @@ -143,6 +143,12 @@ parallel_event_loop_smooth <- function(queue, reporters, test_dir) { updated <- FALSE for (x in msgs) { + if (x$code == PROCESS_OUTPUT) { + lns <- paste0(x$path, ": ", x$message) + cat("\n", file = stdout()) + base::writeLines(lns, stdout()) + next + } if (x$code != PROCESS_MSG) { next } @@ -204,7 +210,10 @@ parallel_event_loop_chunky <- function(queue, reporters, test_dir) { replay_events <- function(reporter, events) { snapshotter <- getOption("testthat.snapshotter") for (m in events) { - if (m$type == "snapshotter") { + if (m$code == PROCESS_OUTPUT) { + lns <- paste0(m$path, ": ", m$message) + base::writeLines(lns, stdout()) + } else if (m$type == "snapshotter") { do.call(snapshotter[[m$cmd]], m$args) } else { do.call(reporter[[m$cmd]], m$args) diff --git a/tests/testthat/test-bare.R b/tests/testthat/test-bare.R index 84ba12892..1d345ea14 100644 --- a/tests/testthat/test-bare.R +++ b/tests/testthat/test-bare.R @@ -14,3 +14,8 @@ stopifnot( expectation_failure = function(e) TRUE ) ) + +test_that("output", { + message("hello!") + expect_true(TRUE) +}) diff --git a/tests/testthat/test-describe.R b/tests/testthat/test-describe.R index 1309efec1..67153304e 100644 --- a/tests/testthat/test-describe.R +++ b/tests/testthat/test-describe.R @@ -50,4 +50,5 @@ describe("describe", { it("should not be possible to access variables from other specs (2)", { expect_false(exists("some_test_var")) }) + print(1:100) }) From c66e454e61b3737ba0d004de3796971743cbe59e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A1bor=20Cs=C3=A1rdi?= Date: Tue, 5 Aug 2025 08:12:55 +0200 Subject: [PATCH 4/7] Fix parallel startup error message Now that we read out stdout/stderr explicitly, we might read it out before the error is reported from a failed worker startup. Or if a task is a startup and not a test file (`path == NA`), then we put the stdout/stderr aside and use it in the error message. --- R/parallel-taskq.R | 61 +++++++++++++++++++++++++++------------------- 1 file changed, 36 insertions(+), 25 deletions(-) diff --git a/R/parallel-taskq.R b/R/parallel-taskq.R index cd9694c88..c718b0bb6 100644 --- a/R/parallel-taskq.R +++ b/R/parallel-taskq.R @@ -52,7 +52,8 @@ task_q <- R6::R6Class( fun = I(list(fun)), args = I(list(args)), worker = I(list(NULL)), - path = args[[1]] + path = args[[1]], + startup = I(list(NULL)) ) private$schedule() invisible(id) @@ -71,33 +72,42 @@ task_q <- R6::R6Class( as_ms(timeout) ) results <- lapply(seq_along(pr), function(i) { + # nothing from this worker? if (is.null(pr[[i]]) || all(pr[[i]] != "ready")) { return() } + + # there is a testthat message? worker <- private$tasks$worker[[i]] - msgs <- NULL - if (pr[[i]][["output"]] == "ready" || pr[[i]][["error"]] == "ready") { + msg <- if (pr[[i]][["process"]] == "ready") { + worker$read() + } + + # there is an output message? + has_output <- pr[[i]][["output"]] == "ready" || + pr[[i]][["error"]] == "ready" + outmsg <- NULL + if (has_output) { lns <- c(worker$read_output_lines(), worker$read_error_lines()) inc <- paste0(worker$read_output(), worker$read_error()) if (nchar(inc)) { lns <- c(lns, strsplit(inc, "\n", fixed = TRUE)[[1]]) } - msg <- structure( - list( - code = PROCESS_OUTPUT, - message = lns, - path = private$tasks$path[i] - ), - class = "testthat_message" - ) - msgs <- list(msg) - } - if (pr[[i]][["process"]] != "ready") { - return(msgs) + # startup message? + if (is.na(private$tasks$path[i])) { + private$tasks$startup[[i]] <- c(private$tasks$startup[[i]], lns) + } else { + outmsg <- structure( + list( + code = PROCESS_OUTPUT, + message = lns, + path = private$tasks$path[i] + ), + class = "testthat_message" + ) + } } - pri <- msg <- worker$read() - ## TODO: why can this be NULL? if (is.null(msg) || msg$code == PROCESS_MSG) { private$tasks$state[[i]] <- "running" @@ -124,13 +134,10 @@ task_q <- R6::R6Class( class = c("testthat_process_error", "testthat_error") ) } - if (!is.null(msg)) { - msgs <- c(msgs, list(msg)) - } - msgs + compact(list(msg, outmsg)) }) - results <- results[!map_lgl(results, is.null)] - results <- unlist(results, recursive = FALSE) + # single list for all workers + results <- compact(unlist(results, recursive = FALSE)) private$schedule() if (is.finite(timeout)) { @@ -161,7 +168,8 @@ task_q <- R6::R6Class( fun = nl, args = nl, worker = nl, - path = NA_character_ + path = NA_character_, + startup = nl ) rsopts <- callr::r_session_options(stdout = "|", stderr = "|", ...) for (i in seq_len(concurrency)) { @@ -205,7 +213,10 @@ task_q <- R6::R6Class( file <- private$tasks$args[[task_no]][[1]] if (is.null(fun)) { msg$error$stdout <- msg$stdout - msg$error$stderr <- msg$stderr + msg$error$stderr <- paste( + c(private$tasks$startup[[task_no]], msg$stderr), + collapse = "\n" + ) abort( paste0( "testthat subprocess failed to start, stderr:\n", From 602fc609d444b94e3032de463592416e3d5b4c8d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A1bor=20Cs=C3=A1rdi?= Date: Tue, 5 Aug 2025 09:16:39 +0200 Subject: [PATCH 5/7] Fix printing parallel stdout w/ chunky event loop --- R/parallel.R | 12 +++++++----- tests/testthat/test-describe.R | 1 + 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/R/parallel.R b/R/parallel.R index 9daee7b9b..0445e50a9 100644 --- a/R/parallel.R +++ b/R/parallel.R @@ -144,7 +144,7 @@ parallel_event_loop_smooth <- function(queue, reporters, test_dir) { updated <- FALSE for (x in msgs) { if (x$code == PROCESS_OUTPUT) { - lns <- paste0(x$path, ": ", x$message) + lns <- paste0("> ", x$path, ": ", x$message) cat("\n", file = stdout()) base::writeLines(lns, stdout()) next @@ -184,6 +184,11 @@ parallel_event_loop_chunky <- function(queue, reporters, test_dir) { while (!queue$is_idle()) { msgs <- queue$poll(Inf) for (x in msgs) { + if (x$code == PROCESS_OUTPUT) { + lns <- paste0("> ", x$path, ": ", x$message) + base::writeLines(lns, stdout()) + next + } if (x$code != PROCESS_MSG) { next } @@ -210,10 +215,7 @@ parallel_event_loop_chunky <- function(queue, reporters, test_dir) { replay_events <- function(reporter, events) { snapshotter <- getOption("testthat.snapshotter") for (m in events) { - if (m$code == PROCESS_OUTPUT) { - lns <- paste0(m$path, ": ", m$message) - base::writeLines(lns, stdout()) - } else if (m$type == "snapshotter") { + if (m$type == "snapshotter") { do.call(snapshotter[[m$cmd]], m$args) } else { do.call(reporter[[m$cmd]], m$args) diff --git a/tests/testthat/test-describe.R b/tests/testthat/test-describe.R index 67153304e..f2276b553 100644 --- a/tests/testthat/test-describe.R +++ b/tests/testthat/test-describe.R @@ -50,5 +50,6 @@ describe("describe", { it("should not be possible to access variables from other specs (2)", { expect_false(exists("some_test_var")) }) + withr::local_options(width = 60) print(1:100) }) From 6fd4371e7423c11dab19e5ed966bbd85c81f0a81 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A1bor=20Cs=C3=A1rdi?= Date: Tue, 5 Aug 2025 09:23:39 +0200 Subject: [PATCH 6/7] Add tests for stdout/stderr from parallel workers --- tests/testthat/test-bare.R | 1 - tests/testthat/test-describe.R | 2 -- tests/testthat/test-parallel-stdout.R | 17 ++++++++++++++++ .../testthat/test-parallel/stdout/DESCRIPTION | 20 +++++++++++++++++++ tests/testthat/test-parallel/stdout/NAMESPACE | 2 ++ .../test-parallel/stdout/tests/testthat.R | 4 ++++ .../stdout/tests/testthat/test-stdout-1.R | 3 +++ .../stdout/tests/testthat/test-stdout-2.R | 4 ++++ .../stdout/tests/testthat/test-stdout-3.R | 4 ++++ 9 files changed, 54 insertions(+), 3 deletions(-) create mode 100644 tests/testthat/test-parallel-stdout.R create mode 100644 tests/testthat/test-parallel/stdout/DESCRIPTION create mode 100644 tests/testthat/test-parallel/stdout/NAMESPACE create mode 100644 tests/testthat/test-parallel/stdout/tests/testthat.R create mode 100644 tests/testthat/test-parallel/stdout/tests/testthat/test-stdout-1.R create mode 100644 tests/testthat/test-parallel/stdout/tests/testthat/test-stdout-2.R create mode 100644 tests/testthat/test-parallel/stdout/tests/testthat/test-stdout-3.R diff --git a/tests/testthat/test-bare.R b/tests/testthat/test-bare.R index 1d345ea14..28818e646 100644 --- a/tests/testthat/test-bare.R +++ b/tests/testthat/test-bare.R @@ -16,6 +16,5 @@ stopifnot( ) test_that("output", { - message("hello!") expect_true(TRUE) }) diff --git a/tests/testthat/test-describe.R b/tests/testthat/test-describe.R index f2276b553..1309efec1 100644 --- a/tests/testthat/test-describe.R +++ b/tests/testthat/test-describe.R @@ -50,6 +50,4 @@ describe("describe", { it("should not be possible to access variables from other specs (2)", { expect_false(exists("some_test_var")) }) - withr::local_options(width = 60) - print(1:100) }) diff --git a/tests/testthat/test-parallel-stdout.R b/tests/testthat/test-parallel-stdout.R new file mode 100644 index 000000000..77ac0fa3d --- /dev/null +++ b/tests/testthat/test-parallel-stdout.R @@ -0,0 +1,17 @@ +test_that("stdout/stderr in parallel code", { + skip_on_covr() + withr::local_envvar(TESTTHAT_PARALLEL = "TRUE") + out <- capture.output(suppressMessages(testthat::test_local( + test_path("test-parallel", "stdout"), + reporter = "summary" + ))) + expect_true("> test-stdout-2.R: This is a message!" %in% out) + expect_true(any(grepl("test-stdout-3.R: [1] 1 2 3", out, fixed = TRUE))) + + out2 <- capture.output(suppressMessages(testthat::test_local( + test_path("test-parallel", "stdout"), + reporter = "progress" + ))) + expect_true("> test-stdout-2.R: This is a message!" %in% out2) + expect_true(any(grepl("test-stdout-3.R: [1] 1 2 3", out2, fixed = TRUE))) +}) diff --git a/tests/testthat/test-parallel/stdout/DESCRIPTION b/tests/testthat/test-parallel/stdout/DESCRIPTION new file mode 100644 index 000000000..95f688e6e --- /dev/null +++ b/tests/testthat/test-parallel/stdout/DESCRIPTION @@ -0,0 +1,20 @@ +Package: setup +Title: What the Package Does (One Line, Title Case) +Version: 0.0.0.9000 +Authors@R: + person(given = "First", + family = "Last", + role = c("aut", "cre"), + email = "first.last@example.com", + comment = c(ORCID = "YOUR-ORCID-ID")) +Description: What the package does (one paragraph). +License: `use_mit_license()`, `use_gpl3_license()` or friends to + pick a license +Encoding: UTF-8 +LazyData: true +Roxygen: list(markdown = TRUE) +RoxygenNote: 7.1.1 +Suggests: + testthat +Config/testthat/parallel: true +Config/testthat/edition: 3 diff --git a/tests/testthat/test-parallel/stdout/NAMESPACE b/tests/testthat/test-parallel/stdout/NAMESPACE new file mode 100644 index 000000000..6ae926839 --- /dev/null +++ b/tests/testthat/test-parallel/stdout/NAMESPACE @@ -0,0 +1,2 @@ +# Generated by roxygen2: do not edit by hand + diff --git a/tests/testthat/test-parallel/stdout/tests/testthat.R b/tests/testthat/test-parallel/stdout/tests/testthat.R new file mode 100644 index 000000000..abb06112c --- /dev/null +++ b/tests/testthat/test-parallel/stdout/tests/testthat.R @@ -0,0 +1,4 @@ +library(testthat) +library(ok) + +test_check("ok") diff --git a/tests/testthat/test-parallel/stdout/tests/testthat/test-stdout-1.R b/tests/testthat/test-parallel/stdout/tests/testthat/test-stdout-1.R new file mode 100644 index 000000000..c051c2dfc --- /dev/null +++ b/tests/testthat/test-parallel/stdout/tests/testthat/test-stdout-1.R @@ -0,0 +1,3 @@ +test_that("this is good", { + expect_equal(2 * 2, 4) +}) diff --git a/tests/testthat/test-parallel/stdout/tests/testthat/test-stdout-2.R b/tests/testthat/test-parallel/stdout/tests/testthat/test-stdout-2.R new file mode 100644 index 000000000..3096c244e --- /dev/null +++ b/tests/testthat/test-parallel/stdout/tests/testthat/test-stdout-2.R @@ -0,0 +1,4 @@ +test_that("this messages", { + message("This is a message!") + expect_true(TRUE) +}) diff --git a/tests/testthat/test-parallel/stdout/tests/testthat/test-stdout-3.R b/tests/testthat/test-parallel/stdout/tests/testthat/test-stdout-3.R new file mode 100644 index 000000000..a2ba80df2 --- /dev/null +++ b/tests/testthat/test-parallel/stdout/tests/testthat/test-stdout-3.R @@ -0,0 +1,4 @@ +test_that("this prints and skips", { + print(1:30) + skip(paste("This is", Sys.getpid())) +}) From 1c5ad3731e6646fabfbc5f13fd34c2e2b38b5209 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A1bor=20Cs=C3=A1rdi?= Date: Tue, 5 Aug 2025 09:37:26 +0200 Subject: [PATCH 7/7] Remove a debug leftover --- tests/testthat/test-bare.R | 4 ---- 1 file changed, 4 deletions(-) diff --git a/tests/testthat/test-bare.R b/tests/testthat/test-bare.R index 28818e646..84ba12892 100644 --- a/tests/testthat/test-bare.R +++ b/tests/testthat/test-bare.R @@ -14,7 +14,3 @@ stopifnot( expectation_failure = function(e) TRUE ) ) - -test_that("output", { - expect_true(TRUE) -})