Skip to content

Commit b084dea

Browse files
authored
feat: add crossbeam-channel for improved thread shutdown handling in cmd_run (#1050)
1 parent 94424a0 commit b084dea

File tree

4 files changed

+81
-12
lines changed

4 files changed

+81
-12
lines changed

core/src/ten_manager/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/src/ten_manager/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ zip = { version = "2.2", default-features = false, features = [
6868
"deflate",
6969
"time",
7070
] }
71+
crossbeam-channel = "0.5.15"
7172

7273
[target."cfg(unix)".dependencies]
7374
mimalloc = "0.1"

core/src/ten_manager/src/designer/exec/cmd_run.rs

Lines changed: 67 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,37 @@ use std::{process::Command, thread};
88

99
use actix::AsyncContext;
1010
use actix_web_actors::ws::WebsocketContext;
11+
use crossbeam_channel::{bounded, Sender};
1112

1213
use crate::designer::exec::RunCmdOutput;
1314

1415
use super::{msg::OutboundMsg, WsRunCmd};
1516

17+
// Add this struct to store shutdown senders.
18+
pub struct ShutdownSenders {
19+
pub stdout: Sender<()>,
20+
pub stderr: Sender<()>,
21+
pub wait: Sender<()>,
22+
}
23+
1624
impl WsRunCmd {
1725
pub fn cmd_run(
1826
&mut self,
1927
cmd: &String,
2028
ctx: &mut WebsocketContext<WsRunCmd>,
2129
) {
30+
// Create shutdown channels for each thread.
31+
let (stdout_shutdown_tx, stdout_shutdown_rx) = bounded::<()>(1);
32+
let (stderr_shutdown_tx, stderr_shutdown_rx) = bounded::<()>(1);
33+
let (wait_shutdown_tx, wait_shutdown_rx) = bounded::<()>(1);
34+
35+
// Store senders in the struct for later cleanup.
36+
self.shutdown_senders = Some(ShutdownSenders {
37+
stdout: stdout_shutdown_tx,
38+
stderr: stderr_shutdown_tx,
39+
wait: wait_shutdown_tx,
40+
});
41+
2242
let mut command = Command::new("sh");
2343
command
2444
.arg("-c")
@@ -61,12 +81,18 @@ impl WsRunCmd {
6181
// Read stdout.
6282
if let Some(mut out) = stdout_child {
6383
let addr_stdout = addr.clone();
84+
let shutdown_rx = stdout_shutdown_rx;
6485

6586
thread::spawn(move || {
6687
use std::io::{BufRead, BufReader};
6788

6889
let reader = BufReader::new(&mut out);
6990
for line_res in reader.lines() {
91+
// Check if we should terminate.
92+
if shutdown_rx.try_recv().is_ok() {
93+
break;
94+
}
95+
7096
match line_res {
7197
Ok(line) => {
7298
// `do_send` is used to asynchronously send messages
@@ -85,12 +111,18 @@ impl WsRunCmd {
85111
// Read stderr.
86112
if let Some(mut err) = stderr_child {
87113
let addr_stderr = addr.clone();
114+
let shutdown_rx = stderr_shutdown_rx;
88115

89116
thread::spawn(move || {
90117
use std::io::{BufRead, BufReader};
91118

92119
let reader = BufReader::new(&mut err);
93120
for line_res in reader.lines() {
121+
// Check if we should terminate.
122+
if shutdown_rx.try_recv().is_ok() {
123+
break;
124+
}
125+
94126
match line_res {
95127
Ok(line) => {
96128
addr_stderr.do_send(RunCmdOutput::StdErr(line));
@@ -105,15 +137,46 @@ impl WsRunCmd {
105137
// Wait for child exit in another thread.
106138
let addr2 = ctx.address();
107139
if let Some(mut child) = self.child.take() {
140+
let shutdown_rx = wait_shutdown_rx;
141+
108142
thread::spawn(move || {
109-
if let Ok(status) = child.wait() {
110-
addr2.do_send(RunCmdOutput::Exit(
111-
status.code().unwrap_or(-1),
112-
));
143+
let exit_status = crossbeam_channel::select! {
144+
recv(shutdown_rx) -> _ => {
145+
// Termination requested, kill the process.
146+
let _ = child.kill();
147+
let _ = child.wait();
148+
None
149+
},
150+
default => {
151+
// Normal wait path.
152+
match child.wait() {
153+
Ok(status) => Some(status.code().unwrap_or(-1)),
154+
Err(_) => None,
155+
}
156+
}
157+
};
158+
159+
if let Some(code) = exit_status {
160+
addr2.do_send(RunCmdOutput::Exit(code));
113161
} else {
114162
addr2.do_send(RunCmdOutput::Exit(-1));
115163
}
116164
});
117165
}
118166
}
167+
168+
// Call this when the actor is stopping or websocket is closing.
169+
pub fn cleanup_threads(&mut self) {
170+
// Signal all threads to terminate.
171+
if let Some(senders) = self.shutdown_senders.take() {
172+
let _ = senders.stdout.send(());
173+
let _ = senders.stderr.send(());
174+
let _ = senders.wait.send(());
175+
}
176+
177+
// Force kill child process if it exists.
178+
if let Some(mut child) = self.child.take() {
179+
let _ = child.kill();
180+
}
181+
}
119182
}

core/src/ten_manager/src/designer/exec/mod.rs

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,12 @@ use actix_web::{web, Error, HttpRequest, HttpResponse};
1616
use actix_web_actors::ws;
1717
use anyhow::Context;
1818
use anyhow::Result;
19-
use msg::InboundMsg;
20-
use run_script::extract_command_from_manifest;
2119

2220
use crate::designer::DesignerState;
23-
21+
use cmd_run::ShutdownSenders;
22+
use msg::InboundMsg;
2423
use msg::OutboundMsg;
24+
use run_script::extract_command_from_manifest;
2525

2626
// The output (stdout, stderr) and exit status from the child process.
2727
#[derive(Message)]
@@ -51,11 +51,17 @@ pub struct WsRunCmd {
5151
child: Option<Child>,
5252
cmd_parser: CmdParser,
5353
working_directory: Option<String>,
54+
shutdown_senders: Option<ShutdownSenders>,
5455
}
5556

5657
impl WsRunCmd {
5758
pub fn new(cmd_parser: CmdParser) -> Self {
58-
Self { child: None, cmd_parser, working_directory: None }
59+
Self {
60+
child: None,
61+
cmd_parser,
62+
working_directory: None,
63+
shutdown_senders: None,
64+
}
5965
}
6066
}
6167

@@ -71,10 +77,8 @@ impl Actor for WsRunCmd {
7177
}
7278

7379
fn stopped(&mut self, _ctx: &mut Self::Context) {
74-
// If the process is still running, try to kill it.
75-
if let Some(mut child) = self.child.take() {
76-
let _ = child.kill();
77-
}
80+
// Call our new cleanup method to properly terminate all threads.
81+
self.cleanup_threads();
7882
}
7983
}
8084

0 commit comments

Comments
 (0)