Skip to content

Commit 4b4c170

Browse files
authored
refactor! enhance asynchronous handling in install functions with channel based logging (#1031)
1 parent 7c34f4d commit 4b4c170

File tree

49 files changed

+266
-159
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+266
-159
lines changed

core/src/ten_manager/src/cmd/cmd_designer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use crate::{
1717
constants::DESIGNER_BACKEND_DEFAULT_PORT,
1818
designer::{configure_routes, frontend::get_frontend_asset, DesignerState},
1919
fs::{check_is_app_folder, get_cwd},
20-
output::{TmanOutput, TmanOutputCli},
20+
output::{cli::TmanOutputCli, TmanOutput},
2121
pkg_info::get_all_pkgs::get_all_pkgs_in_app,
2222
};
2323

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
//
2+
// Copyright © 2025 Agora
3+
// This file is part of TEN Framework, an open source project.
4+
// Licensed under the Apache License, Version 2.0, with certain conditions.
5+
// Refer to the "LICENSE" file in the root directory for more information.
6+
//
7+
use std::sync::{mpsc, Arc};
8+
use std::thread;
9+
10+
use actix::{fut, AsyncContext};
11+
use actix_web_actors::ws::WebsocketContext;
12+
13+
use crate::cmd::cmd_install::InstallCommand;
14+
use crate::config::metadata::TmanMetadata;
15+
use crate::config::TmanConfig;
16+
use crate::designer::builtin_function::{
17+
BuiltinFunctionOutput, WsBuiltinFunction,
18+
};
19+
use crate::output::channel::TmanOutputChannel;
20+
use crate::output::TmanOutput;
21+
22+
pub fn run_installation(
23+
tman_config: Arc<tokio::sync::RwLock<TmanConfig>>,
24+
tman_metadata: Arc<tokio::sync::RwLock<TmanMetadata>>,
25+
install_command: InstallCommand,
26+
ctx: &mut WebsocketContext<WsBuiltinFunction>,
27+
) {
28+
let addr = ctx.address();
29+
30+
// Create a channel for cross-thread communication.
31+
let (sender, receiver) = mpsc::channel();
32+
33+
let output_channel = Arc::new(Box::new(TmanOutputChannel {
34+
sender: sender.clone(),
35+
}) as Box<dyn TmanOutput>);
36+
37+
// Run the installation process in a new thread.
38+
thread::spawn(move || {
39+
// Create a new Tokio runtime to execute asynchronous code.
40+
let rt = tokio::runtime::Builder::new_current_thread()
41+
.enable_all()
42+
.build()
43+
.unwrap();
44+
45+
// Execute the installation in the new runtime.
46+
let result = rt.block_on(async {
47+
crate::cmd::cmd_install::execute_cmd(
48+
tman_config,
49+
tman_metadata,
50+
install_command,
51+
output_channel,
52+
)
53+
.await
54+
});
55+
56+
// Send the completion status.
57+
let exit_code = if result.is_ok() { 0 } else { -1 };
58+
let error_message = if let Err(err) = result {
59+
Some(err.to_string())
60+
} else {
61+
None
62+
};
63+
64+
let _ = sender.send(format!(
65+
"EXIT:{}:{}",
66+
exit_code,
67+
error_message.unwrap_or_default()
68+
));
69+
});
70+
71+
// Start a local task in the main thread to listen to the message channel.
72+
let addr_clone = addr.clone();
73+
74+
// Use actix's fut::wrap_future to convert a standard Future to an
75+
// ActorFuture.
76+
ctx.spawn(fut::wrap_future::<_, WsBuiltinFunction>(async move {
77+
// Use a loop to poll the receiver.
78+
let mut continue_running = true;
79+
80+
while continue_running {
81+
match receiver.try_recv() {
82+
Ok(msg) => {
83+
if msg.starts_with("EXIT:") {
84+
// Parse the exit status.
85+
let parts: Vec<&str> = msg.splitn(3, ':').collect();
86+
if parts.len() >= 2 {
87+
let exit_code =
88+
parts[1].parse::<i32>().unwrap_or(-1);
89+
let error_message =
90+
if parts.len() > 2 && !parts[2].is_empty() {
91+
Some(parts[2].to_string())
92+
} else {
93+
None
94+
};
95+
96+
// Send the exit message.
97+
addr_clone.do_send(BuiltinFunctionOutput::Exit {
98+
exit_code,
99+
error_message,
100+
});
101+
continue_running = false;
102+
}
103+
} else if msg.starts_with("normal_line:") {
104+
// Parse and send normal logs.
105+
let content = msg.replacen("normal_line:", "", 1);
106+
addr_clone.do_send(BuiltinFunctionOutput::NormalLine(
107+
content,
108+
));
109+
} else if msg.starts_with("normal_partial:") {
110+
// Parse and send normal partial logs.
111+
let content = msg.replacen("normal_partial:", "", 1);
112+
addr_clone.do_send(
113+
BuiltinFunctionOutput::NormalPartial(content),
114+
);
115+
} else if msg.starts_with("error_line:") {
116+
// Parse and send error line logs.
117+
let content = msg.replacen("error_line:", "", 1);
118+
addr_clone
119+
.do_send(BuiltinFunctionOutput::ErrorLine(content));
120+
} else if msg.starts_with("error_partial:") {
121+
// Parse and send error partial logs.
122+
let content = msg.replacen("error_partial:", "", 1);
123+
addr_clone.do_send(
124+
BuiltinFunctionOutput::ErrorPartial(content),
125+
);
126+
}
127+
}
128+
Err(mpsc::TryRecvError::Empty) => {
129+
// No message, temporarily yield control.
130+
tokio::task::yield_now().await;
131+
}
132+
Err(mpsc::TryRecvError::Disconnected) => {
133+
// The sender has disconnected, exit the loop.
134+
continue_running = false;
135+
}
136+
}
137+
}
138+
}));
139+
}

core/src/ten_manager/src/designer/builtin_function/install.rs

Lines changed: 10 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,13 @@
44
// Licensed under the Apache License, Version 2.0, with certain conditions.
55
// Refer to the "LICENSE" file in the root directory for more information.
66
//
7-
use std::sync::Arc;
8-
9-
use actix::AsyncContext;
107
use actix_web_actors::ws::WebsocketContext;
11-
use tokio::task::LocalSet;
128

139
use ten_rust::pkg_info::manifest::support::ManifestSupport;
1410

15-
use super::{msg::TmanOutputWs, BuiltinFunctionOutput, WsBuiltinFunction};
16-
use crate::output::TmanOutput;
11+
use super::common::run_installation;
12+
use super::WsBuiltinFunction;
13+
use crate::cmd::cmd_install::LocalInstallMode;
1714

1815
impl WsBuiltinFunction {
1916
pub fn install(
@@ -37,52 +34,17 @@ impl WsBuiltinFunction {
3734
os: None,
3835
arch: None,
3936
},
40-
local_install_mode: crate::cmd::cmd_install::LocalInstallMode::Link,
37+
local_install_mode: LocalInstallMode::Link,
4138
standalone: false,
4239
local_path: None,
4340
cwd: base_dir.clone(),
4441
};
4542

46-
let addr = ctx.address();
47-
let output_ws: Arc<Box<dyn TmanOutput>> =
48-
Arc::new(Box::new(TmanOutputWs { addr: addr.clone() }));
49-
50-
let addr = addr.clone();
51-
52-
// Clone the tman_config to avoid borrowing self in the async task.
53-
let tman_config = self.tman_config.clone();
54-
let tman_metadata = self.tman_metadata.clone();
55-
56-
// Create a LocalSet to ensure spawn_local runs on this thread.
57-
let local = LocalSet::new();
58-
59-
// Spawn the task within the LocalSet context.
60-
local.spawn_local(async move {
61-
let result = crate::cmd::cmd_install::execute_cmd(
62-
tman_config,
63-
tman_metadata,
64-
install_command,
65-
output_ws,
66-
)
67-
.await;
68-
69-
// Notify the WebSocket client that the task is complete, and
70-
// determine the exit code based on the result.
71-
let exit_code = if result.is_ok() { 0 } else { -1 };
72-
let error_message = if let Err(err) = result {
73-
Some(err.to_string())
74-
} else {
75-
None
76-
};
77-
addr.do_send(BuiltinFunctionOutput::Exit {
78-
exit_code,
79-
error_message,
80-
});
81-
});
82-
83-
// Use spawn to run the LocalSet in the background.
84-
actix_web::rt::spawn(async move {
85-
local.await;
86-
});
43+
run_installation(
44+
self.tman_config.clone(),
45+
self.tman_metadata.clone(),
46+
install_command,
47+
ctx,
48+
);
8749
}
8850
}

core/src/ten_manager/src/designer/builtin_function/install_all.rs

Lines changed: 10 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,13 @@
44
// Licensed under the Apache License, Version 2.0, with certain conditions.
55
// Refer to the "LICENSE" file in the root directory for more information.
66
//
7-
use std::sync::Arc;
8-
9-
use actix::AsyncContext;
107
use actix_web_actors::ws::WebsocketContext;
11-
use tokio::task::LocalSet;
128

139
use ten_rust::pkg_info::manifest::support::ManifestSupport;
1410

15-
use super::{msg::TmanOutputWs, BuiltinFunctionOutput, WsBuiltinFunction};
16-
use crate::output::TmanOutput;
11+
use super::common::run_installation;
12+
use super::WsBuiltinFunction;
13+
use crate::cmd::cmd_install::LocalInstallMode;
1714

1815
impl WsBuiltinFunction {
1916
pub fn install_all(
@@ -28,52 +25,17 @@ impl WsBuiltinFunction {
2825
os: None,
2926
arch: None,
3027
},
31-
local_install_mode: crate::cmd::cmd_install::LocalInstallMode::Link,
28+
local_install_mode: LocalInstallMode::Link,
3229
standalone: false,
3330
local_path: None,
3431
cwd: base_dir.clone(),
3532
};
3633

37-
let addr = ctx.address();
38-
let output_ws: Arc<Box<dyn TmanOutput>> =
39-
Arc::new(Box::new(TmanOutputWs { addr: addr.clone() }));
40-
41-
let addr = addr.clone();
42-
43-
// Clone the config and metadata before the async block.
44-
let tman_config = self.tman_config.clone();
45-
let tman_metadata = self.tman_metadata.clone();
46-
47-
// Create a LocalSet to ensure spawn_local runs on this thread.
48-
let local = LocalSet::new();
49-
50-
// Spawn the task within the LocalSet context.
51-
local.spawn_local(async move {
52-
// Now perform the actual work.
53-
let result = crate::cmd::cmd_install::execute_cmd(
54-
tman_config,
55-
tman_metadata,
56-
install_command,
57-
output_ws,
58-
)
59-
.await;
60-
61-
// Notify the WebSocket client that the task is complete.
62-
let exit_code = if result.is_ok() { 0 } else { -1 };
63-
let error_message = if let Err(err) = result {
64-
Some(err.to_string())
65-
} else {
66-
None
67-
};
68-
addr.do_send(BuiltinFunctionOutput::Exit {
69-
exit_code,
70-
error_message,
71-
});
72-
});
73-
74-
// Use spawn to run the LocalSet in the background.
75-
actix_web::rt::spawn(async move {
76-
local.await;
77-
});
34+
run_installation(
35+
self.tman_config.clone(),
36+
self.tman_metadata.clone(),
37+
install_command,
38+
ctx,
39+
);
7840
}
7941
}

core/src/ten_manager/src/designer/builtin_function/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
// Licensed under the Apache License, Version 2.0, with certain conditions.
55
// Refer to the "LICENSE" file in the root directory for more information.
66
//
7+
mod common;
78
mod install;
89
mod install_all;
910
pub mod msg;

core/src/ten_manager/src/main.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,12 @@ use anyhow::Result;
1111
use console::Emoji;
1212
use ten_manager::cmd::execute_cmd;
1313
use ten_manager::config::metadata::TmanMetadata;
14+
use ten_manager::output::cli::TmanOutputCli;
1415
use tokio::runtime::Runtime;
1516

1617
use ten_manager::cmd_line;
1718
use ten_manager::constants::GITHUB_RELEASE_PAGE;
18-
use ten_manager::output::{TmanOutput, TmanOutputCli};
19+
use ten_manager::output::TmanOutput;
1920
use ten_manager::version::VERSION;
2021
use ten_manager::version_utils::check_update;
2122

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
//
2+
// Copyright © 2025 Agora
3+
// This file is part of TEN Framework, an open source project.
4+
// Licensed under the Apache License, Version 2.0, with certain conditions.
5+
// Refer to the "LICENSE" file in the root directory for more information.
6+
//
7+
use std::sync::mpsc;
8+
9+
use super::TmanOutput;
10+
11+
// A TmanOutput implementation to send logs to the channel.
12+
#[derive(Clone)]
13+
pub struct TmanOutputChannel {
14+
pub sender: mpsc::Sender<String>,
15+
}
16+
17+
impl TmanOutput for TmanOutputChannel {
18+
fn normal_line(&self, text: &str) {
19+
let _ = self.sender.send(format!("normal_line:{}", text));
20+
}
21+
22+
fn normal_partial(&self, text: &str) {
23+
let _ = self.sender.send(format!("normal_partial:{}", text));
24+
}
25+
26+
fn error_line(&self, text: &str) {
27+
let _ = self.sender.send(format!("error_line:{}", text));
28+
}
29+
30+
fn error_partial(&self, text: &str) {
31+
let _ = self.sender.send(format!("error_partial:{}", text));
32+
}
33+
34+
fn is_interactive(&self) -> bool {
35+
false
36+
}
37+
}

core/src/ten_manager/src/output.rs renamed to core/src/ten_manager/src/output/cli.rs

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,21 +4,7 @@
44
// Licensed under the Apache License, Version 2.0, with certain conditions.
55
// Refer to the "LICENSE" file in the root directory for more information.
66
//
7-
8-
/// Abstract all log output methods: CLI, WebSocket, etc.
9-
pub trait TmanOutput: Send + Sync {
10-
/// General information.
11-
fn normal_line(&self, text: &str);
12-
fn normal_partial(&self, text: &str);
13-
14-
/// Error information.
15-
fn error_line(&self, text: &str);
16-
fn error_partial(&self, text: &str);
17-
18-
/// Whether it is interactive (e.g., can block waiting for user input in CLI
19-
/// environment).
20-
fn is_interactive(&self) -> bool;
21-
}
7+
use super::TmanOutput;
228

239
/// Output for CLI: directly println / eprintln.
2410
pub struct TmanOutputCli;

0 commit comments

Comments
 (0)