Skip to content

Commit dd96289

Browse files
hadleygaborcsardi
andauthored
Forward stdout/stderr from parallel tests (#2163)
Co-authored-by: Gábor Csárdi <[email protected]>
1 parent 1154cbf commit dd96289

File tree

9 files changed

+120
-12
lines changed

9 files changed

+120
-12
lines changed

R/parallel-taskq.R

Lines changed: 55 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
PROCESS_DONE <- 200L
1313
PROCESS_STARTED <- 201L
1414
PROCESS_MSG <- 301L
15+
PROCESS_OUTPUT <- 302L
1516
PROCESS_EXITED <- 500L
1617
PROCESS_CRASHED <- 501L
1718
PROCESS_CLOSED <- 502L
@@ -50,7 +51,9 @@ task_q <- R6::R6Class(
5051
state = "waiting",
5152
fun = I(list(fun)),
5253
args = I(list(args)),
53-
worker = I(list(NULL))
54+
worker = I(list(NULL)),
55+
path = args[[1]],
56+
startup = I(list(NULL))
5457
)
5558
private$schedule()
5659
invisible(id)
@@ -62,15 +65,49 @@ task_q <- R6::R6Class(
6265
if (x == Inf) -1 else as.integer(as.double(x, "secs") * 1000)
6366
}
6467
repeat {
68+
pr <- vector(mode = "list", nrow(private$tasks))
6569
topoll <- which(private$tasks$state == "running")
66-
conns <- lapply(
70+
pr[topoll] <- processx::poll(
6771
private$tasks$worker[topoll],
68-
function(x) x$get_poll_connection()
72+
as_ms(timeout)
6973
)
70-
pr <- processx::poll(conns, as_ms(timeout))
71-
ready <- topoll[pr == "ready"]
72-
results <- lapply(ready, function(i) {
73-
msg <- private$tasks$worker[[i]]$read()
74+
results <- lapply(seq_along(pr), function(i) {
75+
# nothing from this worker?
76+
if (is.null(pr[[i]]) || all(pr[[i]] != "ready")) {
77+
return()
78+
}
79+
80+
# there is a testthat message?
81+
worker <- private$tasks$worker[[i]]
82+
msg <- if (pr[[i]][["process"]] == "ready") {
83+
worker$read()
84+
}
85+
86+
# there is an output message?
87+
has_output <- pr[[i]][["output"]] == "ready" ||
88+
pr[[i]][["error"]] == "ready"
89+
outmsg <- NULL
90+
if (has_output) {
91+
lns <- c(worker$read_output_lines(), worker$read_error_lines())
92+
inc <- paste0(worker$read_output(), worker$read_error())
93+
if (nchar(inc)) {
94+
lns <- c(lns, strsplit(inc, "\n", fixed = TRUE)[[1]])
95+
}
96+
# startup message?
97+
if (is.na(private$tasks$path[i])) {
98+
private$tasks$startup[[i]] <- c(private$tasks$startup[[i]], lns)
99+
} else {
100+
outmsg <- structure(
101+
list(
102+
code = PROCESS_OUTPUT,
103+
message = lns,
104+
path = private$tasks$path[i]
105+
),
106+
class = "testthat_message"
107+
)
108+
}
109+
}
110+
74111
## TODO: why can this be NULL?
75112
if (is.null(msg) || msg$code == PROCESS_MSG) {
76113
private$tasks$state[[i]] <- "running"
@@ -100,9 +137,10 @@ task_q <- R6::R6Class(
100137
class = c("testthat_process_error", "testthat_error")
101138
)
102139
}
103-
msg
140+
compact(list(msg, outmsg))
104141
})
105-
results <- results[!map_lgl(results, is.null)]
142+
# single list for all workers
143+
results <- compact(unlist(results, recursive = FALSE))
106144

107145
private$schedule()
108146
if (is.finite(timeout)) {
@@ -132,9 +170,11 @@ task_q <- R6::R6Class(
132170
state = "running",
133171
fun = nl,
134172
args = nl,
135-
worker = nl
173+
worker = nl,
174+
path = NA_character_,
175+
startup = nl
136176
)
137-
rsopts <- callr::r_session_options(...)
177+
rsopts <- callr::r_session_options(stdout = "|", stderr = "|", ...)
138178
for (i in seq_len(concurrency)) {
139179
rs <- callr::r_session$new(rsopts, wait = FALSE)
140180
private$tasks$worker[[i]] <- rs
@@ -176,7 +216,10 @@ task_q <- R6::R6Class(
176216
file <- private$tasks$args[[task_no]][[1]]
177217
if (is.null(fun)) {
178218
msg$error$stdout <- msg$stdout
179-
msg$error$stderr <- msg$stderr
219+
msg$error$stderr <- paste(
220+
c(private$tasks$startup[[task_no]], msg$stderr),
221+
collapse = "\n"
222+
)
180223
abort(
181224
paste0(
182225
"testthat subprocess failed to start, stderr:\n",

R/parallel.R

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,12 @@ parallel_event_loop_smooth <- function(queue, reporters, test_dir) {
143143

144144
updated <- FALSE
145145
for (x in msgs) {
146+
if (x$code == PROCESS_OUTPUT) {
147+
lns <- paste0("> ", x$path, ": ", x$message)
148+
cat("\n", file = stdout())
149+
base::writeLines(lns, stdout())
150+
next
151+
}
146152
if (x$code != PROCESS_MSG) {
147153
next
148154
}
@@ -178,6 +184,11 @@ parallel_event_loop_chunky <- function(queue, reporters, test_dir) {
178184
while (!queue$is_idle()) {
179185
msgs <- queue$poll(Inf)
180186
for (x in msgs) {
187+
if (x$code == PROCESS_OUTPUT) {
188+
lns <- paste0("> ", x$path, ": ", x$message)
189+
base::writeLines(lns, stdout())
190+
next
191+
}
181192
if (x$code != PROCESS_MSG) {
182193
next
183194
}

tests/testthat/test-parallel-stdout.R

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
test_that("stdout/stderr in parallel code", {
2+
skip_on_covr()
3+
withr::local_envvar(TESTTHAT_PARALLEL = "TRUE")
4+
out <- capture.output(suppressMessages(testthat::test_local(
5+
test_path("test-parallel", "stdout"),
6+
reporter = "summary"
7+
)))
8+
expect_true("> test-stdout-2.R: This is a message!" %in% out)
9+
expect_true(any(grepl("test-stdout-3.R: [1] 1 2 3", out, fixed = TRUE)))
10+
11+
out2 <- capture.output(suppressMessages(testthat::test_local(
12+
test_path("test-parallel", "stdout"),
13+
reporter = "progress"
14+
)))
15+
expect_true("> test-stdout-2.R: This is a message!" %in% out2)
16+
expect_true(any(grepl("test-stdout-3.R: [1] 1 2 3", out2, fixed = TRUE)))
17+
})
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
Package: setup
2+
Title: What the Package Does (One Line, Title Case)
3+
Version: 0.0.0.9000
4+
Authors@R:
5+
person(given = "First",
6+
family = "Last",
7+
role = c("aut", "cre"),
8+
email = "[email protected]",
9+
comment = c(ORCID = "YOUR-ORCID-ID"))
10+
Description: What the package does (one paragraph).
11+
License: `use_mit_license()`, `use_gpl3_license()` or friends to
12+
pick a license
13+
Encoding: UTF-8
14+
LazyData: true
15+
Roxygen: list(markdown = TRUE)
16+
RoxygenNote: 7.1.1
17+
Suggests:
18+
testthat
19+
Config/testthat/parallel: true
20+
Config/testthat/edition: 3
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
# Generated by roxygen2: do not edit by hand
2+
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
library(testthat)
2+
library(ok)
3+
4+
test_check("ok")
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
test_that("this is good", {
2+
expect_equal(2 * 2, 4)
3+
})
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
test_that("this messages", {
2+
message("This is a message!")
3+
expect_true(TRUE)
4+
})
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
test_that("this prints and skips", {
2+
print(1:30)
3+
skip(paste("This is", Sys.getpid()))
4+
})

0 commit comments

Comments
 (0)