From b1f859479caeaaf886729c1b9290d430c8fb788e Mon Sep 17 00:00:00 2001 From: Hu Yueh-Wei Date: Sun, 20 Apr 2025 14:31:54 +0800 Subject: [PATCH 1/3] refactor: enhance asynchronous handling in install functions with channel-based logging --- .../src/designer/builtin_function/install.rs | 162 ++++++++++++++---- .../designer/builtin_function/install_all.rs | 159 +++++++++++++---- 2 files changed, 262 insertions(+), 59 deletions(-) diff --git a/core/src/ten_manager/src/designer/builtin_function/install.rs b/core/src/ten_manager/src/designer/builtin_function/install.rs index 4b42c4831..92608d63c 100644 --- a/core/src/ten_manager/src/designer/builtin_function/install.rs +++ b/core/src/ten_manager/src/designer/builtin_function/install.rs @@ -4,15 +4,15 @@ // Licensed under the Apache License, Version 2.0, with certain conditions. // Refer to the "LICENSE" file in the root directory for more information. // -use std::sync::Arc; +use std::sync::{mpsc, Arc}; +use std::thread; -use actix::AsyncContext; +use actix::{fut, AsyncContext}; use actix_web_actors::ws::WebsocketContext; -use tokio::task::LocalSet; use ten_rust::pkg_info::manifest::support::ManifestSupport; -use super::{msg::TmanOutputWs, BuiltinFunctionOutput, WsBuiltinFunction}; +use super::{BuiltinFunctionOutput, WsBuiltinFunction}; use crate::output::TmanOutput; impl WsBuiltinFunction { @@ -44,45 +44,147 @@ impl WsBuiltinFunction { }; let addr = ctx.address(); - let output_ws: Arc> = - Arc::new(Box::new(TmanOutputWs { addr: addr.clone() })); - let addr = addr.clone(); + // Create a channel for cross-thread communication. + let (sender, receiver) = mpsc::channel(); - // Clone the tman_config to avoid borrowing self in the async task. + // Create a custom TmanOutput implementation to send logs to the + // channel. + #[derive(Clone)] + struct ChannelOutput { + sender: mpsc::Sender, + } + + impl TmanOutput for ChannelOutput { + fn normal_line(&self, text: &str) { + let _ = self.sender.send(format!("normal_line:{}", text)); + } + + fn normal_partial(&self, text: &str) { + let _ = self.sender.send(format!("normal_partial:{}", text)); + } + + fn error_line(&self, text: &str) { + let _ = self.sender.send(format!("error_line:{}", text)); + } + + fn error_partial(&self, text: &str) { + let _ = self.sender.send(format!("error_partial:{}", text)); + } + + fn is_interactive(&self) -> bool { + false + } + } + + let output_channel = Arc::new(Box::new(ChannelOutput { + sender: sender.clone(), + }) as Box); + + // 克隆安装所需的配置 let tman_config = self.tman_config.clone(); let tman_metadata = self.tman_metadata.clone(); - // Create a LocalSet to ensure spawn_local runs on this thread. - let local = LocalSet::new(); - - // Spawn the task within the LocalSet context. - local.spawn_local(async move { - let result = crate::cmd::cmd_install::execute_cmd( - tman_config, - tman_metadata, - install_command, - output_ws, - ) - .await; - - // Notify the WebSocket client that the task is complete, and - // determine the exit code based on the result. + // 在新线程中运行安装过程 + thread::spawn(move || { + // 创建一个新的 Tokio 运行时来执行异步代码 + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + + // 在新的运行时中执行安装 + let result = rt.block_on(async { + crate::cmd::cmd_install::execute_cmd( + tman_config, + tman_metadata, + install_command, + output_channel, + ) + .await + }); + + // 发送完成状态 let exit_code = if result.is_ok() { 0 } else { -1 }; let error_message = if let Err(err) = result { Some(err.to_string()) } else { None }; - addr.do_send(BuiltinFunctionOutput::Exit { + let _ = sender.send(format!( + "EXIT:{}:{}", exit_code, - error_message, - }); + error_message.unwrap_or_default() + )); }); - // Use spawn to run the LocalSet in the background. - actix_web::rt::spawn(async move { - local.await; - }); + // 在主线程中启动一个本地任务来监听消息通道 + let addr_clone = addr.clone(); + + // 使用actix的fut::wrap_future将标准Future转换为ActorFuture + ctx.spawn(fut::wrap_future::<_, Self>(async move { + // 使用循环轮询接收器 + let mut continue_running = true; + while continue_running { + match receiver.try_recv() { + Ok(msg) => { + if msg.starts_with("EXIT:") { + // 解析退出状态 + let parts: Vec<&str> = msg.splitn(3, ':').collect(); + if parts.len() >= 2 { + let exit_code = + parts[1].parse::().unwrap_or(-1); + let error_message = if parts.len() > 2 + && !parts[2].is_empty() + { + Some(parts[2].to_string()) + } else { + None + }; + + // 发送退出消息 + addr_clone.do_send( + BuiltinFunctionOutput::Exit { + exit_code, + error_message, + }, + ); + continue_running = false; + } + } else if msg.starts_with("normal_line:") { + // 解析并发送正常日志 + let content = msg.replacen("normal_line:", "", 1); + addr_clone.do_send( + BuiltinFunctionOutput::NormalLine(content), + ); + } else if msg.starts_with("normal_partial:") { + let content = + msg.replacen("normal_partial:", "", 1); + addr_clone.do_send( + BuiltinFunctionOutput::NormalPartial(content), + ); + } else if msg.starts_with("error_line:") { + let content = msg.replacen("error_line:", "", 1); + addr_clone.do_send( + BuiltinFunctionOutput::ErrorLine(content), + ); + } else if msg.starts_with("error_partial:") { + let content = msg.replacen("error_partial:", "", 1); + addr_clone.do_send( + BuiltinFunctionOutput::ErrorPartial(content), + ); + } + } + Err(mpsc::TryRecvError::Empty) => { + // 没有消息,暂时让出控制权 + tokio::task::yield_now().await; + } + Err(mpsc::TryRecvError::Disconnected) => { + // 发送方已断开,退出循环 + continue_running = false; + } + } + } + })); } } diff --git a/core/src/ten_manager/src/designer/builtin_function/install_all.rs b/core/src/ten_manager/src/designer/builtin_function/install_all.rs index b6bf4c512..110daf323 100644 --- a/core/src/ten_manager/src/designer/builtin_function/install_all.rs +++ b/core/src/ten_manager/src/designer/builtin_function/install_all.rs @@ -4,15 +4,15 @@ // Licensed under the Apache License, Version 2.0, with certain conditions. // Refer to the "LICENSE" file in the root directory for more information. // -use std::sync::Arc; +use std::sync::{mpsc, Arc}; +use std::thread; -use actix::AsyncContext; +use actix::{fut, AsyncContext}; use actix_web_actors::ws::WebsocketContext; -use tokio::task::LocalSet; use ten_rust::pkg_info::manifest::support::ManifestSupport; -use super::{msg::TmanOutputWs, BuiltinFunctionOutput, WsBuiltinFunction}; +use super::{BuiltinFunctionOutput, WsBuiltinFunction}; use crate::output::TmanOutput; impl WsBuiltinFunction { @@ -35,45 +35,146 @@ impl WsBuiltinFunction { }; let addr = ctx.address(); - let output_ws: Arc> = - Arc::new(Box::new(TmanOutputWs { addr: addr.clone() })); - let addr = addr.clone(); + // 创建通道,用于跨线程通信 + let (sender, receiver) = mpsc::channel(); + + // 创建自定义的 TmanOutput 实现,将日志发送到通道 + #[derive(Clone)] + struct ChannelOutput { + sender: mpsc::Sender, + } + + impl TmanOutput for ChannelOutput { + fn normal_line(&self, text: &str) { + let _ = self.sender.send(format!("normal_line:{}", text)); + } + + fn normal_partial(&self, text: &str) { + let _ = self.sender.send(format!("normal_partial:{}", text)); + } + + fn error_line(&self, text: &str) { + let _ = self.sender.send(format!("error_line:{}", text)); + } + + fn error_partial(&self, text: &str) { + let _ = self.sender.send(format!("error_partial:{}", text)); + } + + fn is_interactive(&self) -> bool { + false + } + } + + let output_channel = Arc::new(Box::new(ChannelOutput { + sender: sender.clone(), + }) as Box); // Clone the config and metadata before the async block. let tman_config = self.tman_config.clone(); let tman_metadata = self.tman_metadata.clone(); - // Create a LocalSet to ensure spawn_local runs on this thread. - let local = LocalSet::new(); - - // Spawn the task within the LocalSet context. - local.spawn_local(async move { - // Now perform the actual work. - let result = crate::cmd::cmd_install::execute_cmd( - tman_config, - tman_metadata, - install_command, - output_ws, - ) - .await; - - // Notify the WebSocket client that the task is complete. + // 在新线程中运行安装过程 + thread::spawn(move || { + // 创建一个新的 Tokio 运行时来执行异步代码 + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + + // 在新的运行时中执行安装 + let result = rt.block_on(async { + crate::cmd::cmd_install::execute_cmd( + tman_config, + tman_metadata, + install_command, + output_channel, + ) + .await + }); + + // 发送完成状态 let exit_code = if result.is_ok() { 0 } else { -1 }; let error_message = if let Err(err) = result { Some(err.to_string()) } else { None }; - addr.do_send(BuiltinFunctionOutput::Exit { + let _ = sender.send(format!( + "EXIT:{}:{}", exit_code, - error_message, - }); + error_message.unwrap_or_default() + )); }); - // Use spawn to run the LocalSet in the background. - actix_web::rt::spawn(async move { - local.await; - }); + // 在主线程中启动一个本地任务来监听消息通道 + let addr_clone = addr.clone(); + + // 使用actix的fut::wrap_future将标准Future转换为ActorFuture + ctx.spawn(fut::wrap_future::<_, Self>(async move { + // 使用循环轮询接收器 + let mut continue_running = true; + while continue_running { + match receiver.try_recv() { + Ok(msg) => { + if msg.starts_with("EXIT:") { + // 解析退出状态 + let parts: Vec<&str> = msg.splitn(3, ':').collect(); + if parts.len() >= 2 { + let exit_code = + parts[1].parse::().unwrap_or(-1); + let error_message = if parts.len() > 2 + && !parts[2].is_empty() + { + Some(parts[2].to_string()) + } else { + None + }; + + // 发送退出消息 + addr_clone.do_send( + BuiltinFunctionOutput::Exit { + exit_code, + error_message, + }, + ); + continue_running = false; + } + } else if msg.starts_with("normal_line:") { + // 解析并发送正常日志 + let content = msg.replacen("normal_line:", "", 1); + addr_clone.do_send( + BuiltinFunctionOutput::NormalLine(content), + ); + } else if msg.starts_with("normal_partial:") { + let content = + msg.replacen("normal_partial:", "", 1); + addr_clone.do_send( + BuiltinFunctionOutput::NormalPartial(content), + ); + } else if msg.starts_with("error_line:") { + let content = msg.replacen("error_line:", "", 1); + addr_clone.do_send( + BuiltinFunctionOutput::ErrorLine(content), + ); + } else if msg.starts_with("error_partial:") { + let content = msg.replacen("error_partial:", "", 1); + addr_clone.do_send( + BuiltinFunctionOutput::ErrorPartial(content), + ); + } + } + Err(mpsc::TryRecvError::Empty) => { + // 没有消息,暂时让出控制权 + tokio::task::yield_now().await; + } + Err(mpsc::TryRecvError::Disconnected) => { + // 发送方已断开,退出循环 + continue_running = false; + } + } + } + })); } } From a5da132ad6da1bf50753758ff816b12829ee9b35 Mon Sep 17 00:00:00 2001 From: Hu Yueh-Wei Date: Sun, 20 Apr 2025 14:44:40 +0800 Subject: [PATCH 2/3] fix: refine codes --- core/src/ten_manager/src/cmd/cmd_designer.rs | 2 +- .../src/designer/builtin_function/install.rs | 65 +++++++------------ .../designer/builtin_function/install_all.rs | 59 +++++------------ core/src/ten_manager/src/main.rs | 3 +- core/src/ten_manager/src/output/channel.rs | 37 +++++++++++ .../src/{output.rs => output/cli.rs} | 16 +---- core/src/ten_manager/src/output/mod.rs | 23 +++++++ .../tests/test_case/common/designer_state.rs | 2 +- .../tests/test_case/designer/apps/addons.rs | 2 +- .../tests/test_case/designer/apps/create.rs | 2 +- .../tests/test_case/designer/apps/get.rs | 2 +- .../tests/test_case/designer/apps/load.rs | 2 +- .../tests/test_case/designer/apps/reload.rs | 2 +- .../test_case/designer/apps/schema/mod.rs | 2 +- .../tests/test_case/designer/apps/unload.rs | 2 +- .../designer/builtin_function/install_all.rs | 2 +- .../designer/builtin_function/mod.rs | 2 +- .../tests/test_case/designer/dir_list/mod.rs | 2 +- .../tests/test_case/designer/doc_link/mod.rs | 2 +- .../tests/test_case/designer/env/mod.rs | 2 +- .../designer/extensions/property/mod.rs | 2 +- .../designer/extensions/schema/mod.rs | 2 +- .../tests/test_case/designer/get_apps.rs | 2 +- .../tests/test_case/designer/get_graphs.rs | 2 +- .../designer/get_packages_scripts.rs | 2 +- .../designer/graphs/connections/add.rs | 2 +- .../designer/graphs/connections/delete.rs | 2 +- .../designer/graphs/connections/get.rs | 2 +- .../msg_conversion/add_with_msg_conversion.rs | 2 +- .../modify_with_msg_conversion.rs | 2 +- .../tests/test_case/designer/graphs/get.rs | 2 +- .../test_case/designer/graphs/nodes/add.rs | 2 +- .../test_case/designer/graphs/nodes/delete.rs | 2 +- .../test_case/designer/graphs/nodes/get.rs | 2 +- .../designer/graphs/nodes/property/modify.rs | 2 +- .../designer/graphs/nodes/replace.rs | 2 +- .../tests/test_case/designer/graphs/update.rs | 2 +- .../tests/test_case/designer/help_text/mod.rs | 2 +- .../tests/test_case/designer/load_apps.rs | 2 +- .../test_case/designer/manifest/validate.rs | 2 +- .../test_case/designer/messages/compatible.rs | 2 +- .../tests/test_case/designer/metadata/mod.rs | 10 +-- .../test_case/designer/preferences/mod.rs | 2 +- .../test_case/designer/property/validate.rs | 2 +- .../tests/test_case/designer/reload_apps.rs | 2 +- .../test_case/designer/template_pkgs/mod.rs | 2 +- .../tests/test_case/designer/version/mod.rs | 2 +- 47 files changed, 145 insertions(+), 148 deletions(-) create mode 100644 core/src/ten_manager/src/output/channel.rs rename core/src/ten_manager/src/{output.rs => output/cli.rs} (61%) create mode 100644 core/src/ten_manager/src/output/mod.rs diff --git a/core/src/ten_manager/src/cmd/cmd_designer.rs b/core/src/ten_manager/src/cmd/cmd_designer.rs index caac341ff..0ae7e596d 100644 --- a/core/src/ten_manager/src/cmd/cmd_designer.rs +++ b/core/src/ten_manager/src/cmd/cmd_designer.rs @@ -17,7 +17,7 @@ use crate::{ constants::DESIGNER_BACKEND_DEFAULT_PORT, designer::{configure_routes, frontend::get_frontend_asset, DesignerState}, fs::{check_is_app_folder, get_cwd}, - output::{TmanOutput, TmanOutputCli}, + output::{cli::TmanOutputCli, TmanOutput}, pkg_info::get_all_pkgs::get_all_pkgs_in_app, }; diff --git a/core/src/ten_manager/src/designer/builtin_function/install.rs b/core/src/ten_manager/src/designer/builtin_function/install.rs index 92608d63c..b3f4d1c2a 100644 --- a/core/src/ten_manager/src/designer/builtin_function/install.rs +++ b/core/src/ten_manager/src/designer/builtin_function/install.rs @@ -13,6 +13,7 @@ use actix_web_actors::ws::WebsocketContext; use ten_rust::pkg_info::manifest::support::ManifestSupport; use super::{BuiltinFunctionOutput, WsBuiltinFunction}; +use crate::output::channel::TmanOutputChannel; use crate::output::TmanOutput; impl WsBuiltinFunction { @@ -48,52 +49,23 @@ impl WsBuiltinFunction { // Create a channel for cross-thread communication. let (sender, receiver) = mpsc::channel(); - // Create a custom TmanOutput implementation to send logs to the - // channel. - #[derive(Clone)] - struct ChannelOutput { - sender: mpsc::Sender, - } - - impl TmanOutput for ChannelOutput { - fn normal_line(&self, text: &str) { - let _ = self.sender.send(format!("normal_line:{}", text)); - } - - fn normal_partial(&self, text: &str) { - let _ = self.sender.send(format!("normal_partial:{}", text)); - } - - fn error_line(&self, text: &str) { - let _ = self.sender.send(format!("error_line:{}", text)); - } - - fn error_partial(&self, text: &str) { - let _ = self.sender.send(format!("error_partial:{}", text)); - } - - fn is_interactive(&self) -> bool { - false - } - } - - let output_channel = Arc::new(Box::new(ChannelOutput { + let output_channel = Arc::new(Box::new(TmanOutputChannel { sender: sender.clone(), }) as Box); - // 克隆安装所需的配置 + // Clone the configuration required for installation. let tman_config = self.tman_config.clone(); let tman_metadata = self.tman_metadata.clone(); - // 在新线程中运行安装过程 + // Run the installation process in a new thread. thread::spawn(move || { - // 创建一个新的 Tokio 运行时来执行异步代码 + // Create a new Tokio runtime to execute asynchronous code. let rt = tokio::runtime::Builder::new_current_thread() .enable_all() .build() .unwrap(); - // 在新的运行时中执行安装 + // Execute the installation in the new runtime. let result = rt.block_on(async { crate::cmd::cmd_install::execute_cmd( tman_config, @@ -104,13 +76,14 @@ impl WsBuiltinFunction { .await }); - // 发送完成状态 + // Send the completion status. let exit_code = if result.is_ok() { 0 } else { -1 }; let error_message = if let Err(err) = result { Some(err.to_string()) } else { None }; + let _ = sender.send(format!( "EXIT:{}:{}", exit_code, @@ -118,18 +91,21 @@ impl WsBuiltinFunction { )); }); - // 在主线程中启动一个本地任务来监听消息通道 + // Start a local task in the main thread to listen to the message + // channel. let addr_clone = addr.clone(); - // 使用actix的fut::wrap_future将标准Future转换为ActorFuture + // Use actix's fut::wrap_future to convert a standard Future to an + // ActorFuture. ctx.spawn(fut::wrap_future::<_, Self>(async move { - // 使用循环轮询接收器 + // Use a loop to poll the receiver. let mut continue_running = true; + while continue_running { match receiver.try_recv() { Ok(msg) => { if msg.starts_with("EXIT:") { - // 解析退出状态 + // Parse the exit status. let parts: Vec<&str> = msg.splitn(3, ':').collect(); if parts.len() >= 2 { let exit_code = @@ -142,7 +118,7 @@ impl WsBuiltinFunction { None }; - // 发送退出消息 + // Send the exit message. addr_clone.do_send( BuiltinFunctionOutput::Exit { exit_code, @@ -152,23 +128,26 @@ impl WsBuiltinFunction { continue_running = false; } } else if msg.starts_with("normal_line:") { - // 解析并发送正常日志 + // Parse and send normal logs. let content = msg.replacen("normal_line:", "", 1); addr_clone.do_send( BuiltinFunctionOutput::NormalLine(content), ); } else if msg.starts_with("normal_partial:") { + // Parse and send normal partial logs. let content = msg.replacen("normal_partial:", "", 1); addr_clone.do_send( BuiltinFunctionOutput::NormalPartial(content), ); } else if msg.starts_with("error_line:") { + // Parse and send error line logs. let content = msg.replacen("error_line:", "", 1); addr_clone.do_send( BuiltinFunctionOutput::ErrorLine(content), ); } else if msg.starts_with("error_partial:") { + // Parse and send error partial logs. let content = msg.replacen("error_partial:", "", 1); addr_clone.do_send( BuiltinFunctionOutput::ErrorPartial(content), @@ -176,11 +155,11 @@ impl WsBuiltinFunction { } } Err(mpsc::TryRecvError::Empty) => { - // 没有消息,暂时让出控制权 + // No message, temporarily yield control. tokio::task::yield_now().await; } Err(mpsc::TryRecvError::Disconnected) => { - // 发送方已断开,退出循环 + // The sender has disconnected, exit the loop. continue_running = false; } } diff --git a/core/src/ten_manager/src/designer/builtin_function/install_all.rs b/core/src/ten_manager/src/designer/builtin_function/install_all.rs index 110daf323..1ed24cb6d 100644 --- a/core/src/ten_manager/src/designer/builtin_function/install_all.rs +++ b/core/src/ten_manager/src/designer/builtin_function/install_all.rs @@ -13,6 +13,7 @@ use actix_web_actors::ws::WebsocketContext; use ten_rust::pkg_info::manifest::support::ManifestSupport; use super::{BuiltinFunctionOutput, WsBuiltinFunction}; +use crate::output::channel::TmanOutputChannel; use crate::output::TmanOutput; impl WsBuiltinFunction { @@ -36,38 +37,10 @@ impl WsBuiltinFunction { let addr = ctx.address(); - // 创建通道,用于跨线程通信 + // Create a channel for cross-thread communication. let (sender, receiver) = mpsc::channel(); - // 创建自定义的 TmanOutput 实现,将日志发送到通道 - #[derive(Clone)] - struct ChannelOutput { - sender: mpsc::Sender, - } - - impl TmanOutput for ChannelOutput { - fn normal_line(&self, text: &str) { - let _ = self.sender.send(format!("normal_line:{}", text)); - } - - fn normal_partial(&self, text: &str) { - let _ = self.sender.send(format!("normal_partial:{}", text)); - } - - fn error_line(&self, text: &str) { - let _ = self.sender.send(format!("error_line:{}", text)); - } - - fn error_partial(&self, text: &str) { - let _ = self.sender.send(format!("error_partial:{}", text)); - } - - fn is_interactive(&self) -> bool { - false - } - } - - let output_channel = Arc::new(Box::new(ChannelOutput { + let output_channel = Arc::new(Box::new(TmanOutputChannel { sender: sender.clone(), }) as Box); @@ -75,15 +48,15 @@ impl WsBuiltinFunction { let tman_config = self.tman_config.clone(); let tman_metadata = self.tman_metadata.clone(); - // 在新线程中运行安装过程 + // Run the installation process in a new thread. thread::spawn(move || { - // 创建一个新的 Tokio 运行时来执行异步代码 + // Create a new Tokio runtime to execute asynchronous code. let rt = tokio::runtime::Builder::new_current_thread() .enable_all() .build() .unwrap(); - // 在新的运行时中执行安装 + // Execute the installation in the new runtime. let result = rt.block_on(async { crate::cmd::cmd_install::execute_cmd( tman_config, @@ -94,7 +67,7 @@ impl WsBuiltinFunction { .await }); - // 发送完成状态 + // Send the completion status. let exit_code = if result.is_ok() { 0 } else { -1 }; let error_message = if let Err(err) = result { Some(err.to_string()) @@ -108,18 +81,20 @@ impl WsBuiltinFunction { )); }); - // 在主线程中启动一个本地任务来监听消息通道 + // Start a local task in the main thread to listen to the message + // channel. let addr_clone = addr.clone(); - // 使用actix的fut::wrap_future将标准Future转换为ActorFuture + // Use actix's fut::wrap_future to convert a standard Future to an + // ActorFuture. ctx.spawn(fut::wrap_future::<_, Self>(async move { - // 使用循环轮询接收器 + // Use a loop to poll the receiver. let mut continue_running = true; while continue_running { match receiver.try_recv() { Ok(msg) => { if msg.starts_with("EXIT:") { - // 解析退出状态 + // Parse the exit status. let parts: Vec<&str> = msg.splitn(3, ':').collect(); if parts.len() >= 2 { let exit_code = @@ -132,7 +107,7 @@ impl WsBuiltinFunction { None }; - // 发送退出消息 + // Send the exit message. addr_clone.do_send( BuiltinFunctionOutput::Exit { exit_code, @@ -142,7 +117,7 @@ impl WsBuiltinFunction { continue_running = false; } } else if msg.starts_with("normal_line:") { - // 解析并发送正常日志 + // Parse and send normal logs. let content = msg.replacen("normal_line:", "", 1); addr_clone.do_send( BuiltinFunctionOutput::NormalLine(content), @@ -166,11 +141,11 @@ impl WsBuiltinFunction { } } Err(mpsc::TryRecvError::Empty) => { - // 没有消息,暂时让出控制权 + // No message, temporarily yield control. tokio::task::yield_now().await; } Err(mpsc::TryRecvError::Disconnected) => { - // 发送方已断开,退出循环 + // The sender has disconnected, exit the loop. continue_running = false; } } diff --git a/core/src/ten_manager/src/main.rs b/core/src/ten_manager/src/main.rs index 94d641651..c55c5ec46 100644 --- a/core/src/ten_manager/src/main.rs +++ b/core/src/ten_manager/src/main.rs @@ -11,11 +11,12 @@ use anyhow::Result; use console::Emoji; use ten_manager::cmd::execute_cmd; use ten_manager::config::metadata::TmanMetadata; +use ten_manager::output::cli::TmanOutputCli; use tokio::runtime::Runtime; use ten_manager::cmd_line; use ten_manager::constants::GITHUB_RELEASE_PAGE; -use ten_manager::output::{TmanOutput, TmanOutputCli}; +use ten_manager::output::TmanOutput; use ten_manager::version::VERSION; use ten_manager::version_utils::check_update; diff --git a/core/src/ten_manager/src/output/channel.rs b/core/src/ten_manager/src/output/channel.rs new file mode 100644 index 000000000..f89b8fd90 --- /dev/null +++ b/core/src/ten_manager/src/output/channel.rs @@ -0,0 +1,37 @@ +// +// Copyright © 2025 Agora +// This file is part of TEN Framework, an open source project. +// Licensed under the Apache License, Version 2.0, with certain conditions. +// Refer to the "LICENSE" file in the root directory for more information. +// +use std::sync::mpsc; + +use super::TmanOutput; + +// A TmanOutput implementation to send logs to the channel. +#[derive(Clone)] +pub struct TmanOutputChannel { + pub sender: mpsc::Sender, +} + +impl TmanOutput for TmanOutputChannel { + fn normal_line(&self, text: &str) { + let _ = self.sender.send(format!("normal_line:{}", text)); + } + + fn normal_partial(&self, text: &str) { + let _ = self.sender.send(format!("normal_partial:{}", text)); + } + + fn error_line(&self, text: &str) { + let _ = self.sender.send(format!("error_line:{}", text)); + } + + fn error_partial(&self, text: &str) { + let _ = self.sender.send(format!("error_partial:{}", text)); + } + + fn is_interactive(&self) -> bool { + false + } +} diff --git a/core/src/ten_manager/src/output.rs b/core/src/ten_manager/src/output/cli.rs similarity index 61% rename from core/src/ten_manager/src/output.rs rename to core/src/ten_manager/src/output/cli.rs index 7ad2a4254..7bab71d7f 100644 --- a/core/src/ten_manager/src/output.rs +++ b/core/src/ten_manager/src/output/cli.rs @@ -4,21 +4,7 @@ // Licensed under the Apache License, Version 2.0, with certain conditions. // Refer to the "LICENSE" file in the root directory for more information. // - -/// Abstract all log output methods: CLI, WebSocket, etc. -pub trait TmanOutput: Send + Sync { - /// General information. - fn normal_line(&self, text: &str); - fn normal_partial(&self, text: &str); - - /// Error information. - fn error_line(&self, text: &str); - fn error_partial(&self, text: &str); - - /// Whether it is interactive (e.g., can block waiting for user input in CLI - /// environment). - fn is_interactive(&self) -> bool; -} +use super::TmanOutput; /// Output for CLI: directly println / eprintln. pub struct TmanOutputCli; diff --git a/core/src/ten_manager/src/output/mod.rs b/core/src/ten_manager/src/output/mod.rs new file mode 100644 index 000000000..42377883a --- /dev/null +++ b/core/src/ten_manager/src/output/mod.rs @@ -0,0 +1,23 @@ +// +// Copyright © 2025 Agora +// This file is part of TEN Framework, an open source project. +// Licensed under the Apache License, Version 2.0, with certain conditions. +// Refer to the "LICENSE" file in the root directory for more information. +// +pub mod channel; +pub mod cli; + +/// Abstract all log output methods: CLI, WebSocket, etc. +pub trait TmanOutput: Send + Sync { + /// General information. + fn normal_line(&self, text: &str); + fn normal_partial(&self, text: &str); + + /// Error information. + fn error_line(&self, text: &str); + fn error_partial(&self, text: &str); + + /// Whether it is interactive (e.g., can block waiting for user input in CLI + /// environment). + fn is_interactive(&self) -> bool; +} diff --git a/core/src/ten_manager/tests/test_case/common/designer_state.rs b/core/src/ten_manager/tests/test_case/common/designer_state.rs index 043c501bd..724f1fabf 100644 --- a/core/src/ten_manager/tests/test_case/common/designer_state.rs +++ b/core/src/ten_manager/tests/test_case/common/designer_state.rs @@ -9,7 +9,7 @@ use std::{collections::HashMap, sync::Arc}; use ten_manager::{ config::{metadata::TmanMetadata, read_config, TmanConfig}, designer::DesignerState, - output::TmanOutputCli, + output::cli::TmanOutputCli, }; use super::tman_config::find_config_json; diff --git a/core/src/ten_manager/tests/test_case/designer/apps/addons.rs b/core/src/ten_manager/tests/test_case/designer/apps/addons.rs index 53693f2e2..d33566a42 100644 --- a/core/src/ten_manager/tests/test_case/designer/apps/addons.rs +++ b/core/src/ten_manager/tests/test_case/designer/apps/addons.rs @@ -20,7 +20,7 @@ mod tests { }; use ten_manager::designer::response::ApiResponse; use ten_manager::designer::DesignerState; - use ten_manager::output::TmanOutputCli; + use ten_manager::output::cli::TmanOutputCli; use crate::test_case::common::mock::inject_all_pkgs_for_mock; diff --git a/core/src/ten_manager/tests/test_case/designer/apps/create.rs b/core/src/ten_manager/tests/test_case/designer/apps/create.rs index fa3f6dd5c..fdc96d9e5 100644 --- a/core/src/ten_manager/tests/test_case/designer/apps/create.rs +++ b/core/src/ten_manager/tests/test_case/designer/apps/create.rs @@ -19,7 +19,7 @@ mod tests { create_app_endpoint, CreateAppRequestPayload, }; use ten_manager::designer::DesignerState; - use ten_manager::output::TmanOutputCli; + use ten_manager::output::cli::TmanOutputCli; #[actix_web::test] async fn test_create_app_success() { diff --git a/core/src/ten_manager/tests/test_case/designer/apps/get.rs b/core/src/ten_manager/tests/test_case/designer/apps/get.rs index d75cf009b..2902eb332 100644 --- a/core/src/ten_manager/tests/test_case/designer/apps/get.rs +++ b/core/src/ten_manager/tests/test_case/designer/apps/get.rs @@ -20,7 +20,7 @@ mod tests { }; use ten_manager::designer::response::{ApiResponse, Status}; use ten_manager::designer::DesignerState; - use ten_manager::output::TmanOutputCli; + use ten_manager::output::cli::TmanOutputCli; #[actix_web::test] async fn test_get_apps_some() { diff --git a/core/src/ten_manager/tests/test_case/designer/apps/load.rs b/core/src/ten_manager/tests/test_case/designer/apps/load.rs index 40cbb30ad..d8a5d9dc2 100644 --- a/core/src/ten_manager/tests/test_case/designer/apps/load.rs +++ b/core/src/ten_manager/tests/test_case/designer/apps/load.rs @@ -18,7 +18,7 @@ mod tests { }; use ten_manager::designer::response::ApiResponse; use ten_manager::designer::DesignerState; - use ten_manager::output::TmanOutputCli; + use ten_manager::output::cli::TmanOutputCli; #[actix_web::test] async fn test_load_app_fail() { diff --git a/core/src/ten_manager/tests/test_case/designer/apps/reload.rs b/core/src/ten_manager/tests/test_case/designer/apps/reload.rs index dc867f6d6..6f7319d83 100644 --- a/core/src/ten_manager/tests/test_case/designer/apps/reload.rs +++ b/core/src/ten_manager/tests/test_case/designer/apps/reload.rs @@ -18,7 +18,7 @@ mod tests { response::{ErrorResponse, Status}, DesignerState, }, - output::TmanOutputCli, + output::cli::TmanOutputCli, }; use ten_rust::base_dir_pkg_info::PkgsInfoInApp; diff --git a/core/src/ten_manager/tests/test_case/designer/apps/schema/mod.rs b/core/src/ten_manager/tests/test_case/designer/apps/schema/mod.rs index 3bcc45319..757ee20c1 100644 --- a/core/src/ten_manager/tests/test_case/designer/apps/schema/mod.rs +++ b/core/src/ten_manager/tests/test_case/designer/apps/schema/mod.rs @@ -20,7 +20,7 @@ mod tests { response::{ApiResponse, Status}, DesignerState, }, - output::TmanOutputCli, + output::cli::TmanOutputCli, }; use crate::test_case::common::mock::inject_all_pkgs_for_mock; diff --git a/core/src/ten_manager/tests/test_case/designer/apps/unload.rs b/core/src/ten_manager/tests/test_case/designer/apps/unload.rs index 43d8a373a..dd7c4d6c8 100644 --- a/core/src/ten_manager/tests/test_case/designer/apps/unload.rs +++ b/core/src/ten_manager/tests/test_case/designer/apps/unload.rs @@ -20,7 +20,7 @@ mod tests { }; use ten_manager::designer::response::{ApiResponse, ErrorResponse, Status}; use ten_manager::designer::DesignerState; - use ten_manager::output::TmanOutputCli; + use ten_manager::output::cli::TmanOutputCli; #[actix_web::test] async fn test_unload_app_success() { diff --git a/core/src/ten_manager/tests/test_case/designer/builtin_function/install_all.rs b/core/src/ten_manager/tests/test_case/designer/builtin_function/install_all.rs index 02d82b699..be3a93b9b 100644 --- a/core/src/ten_manager/tests/test_case/designer/builtin_function/install_all.rs +++ b/core/src/ten_manager/tests/test_case/designer/builtin_function/install_all.rs @@ -15,7 +15,7 @@ use ten_manager::{ designer::builtin_function::{builtin_function_endpoint, msg::InboundMsg}, }; use ten_manager::{ - config::TmanConfig, designer::DesignerState, output::TmanOutputCli, + config::TmanConfig, designer::DesignerState, output::cli::TmanOutputCli, }; use crate::test_case::common::builtin_server::start_test_server; diff --git a/core/src/ten_manager/tests/test_case/designer/builtin_function/mod.rs b/core/src/ten_manager/tests/test_case/designer/builtin_function/mod.rs index 42a6123dc..5557612df 100644 --- a/core/src/ten_manager/tests/test_case/designer/builtin_function/mod.rs +++ b/core/src/ten_manager/tests/test_case/designer/builtin_function/mod.rs @@ -16,7 +16,7 @@ use actix_web::{ use ten_manager::{ config::{metadata::TmanMetadata, TmanConfig}, designer::{builtin_function::builtin_function_endpoint, DesignerState}, - output::TmanOutputCli, + output::cli::TmanOutputCli, }; #[actix_rt::test] diff --git a/core/src/ten_manager/tests/test_case/designer/dir_list/mod.rs b/core/src/ten_manager/tests/test_case/designer/dir_list/mod.rs index 837921834..c2602488b 100644 --- a/core/src/ten_manager/tests/test_case/designer/dir_list/mod.rs +++ b/core/src/ten_manager/tests/test_case/designer/dir_list/mod.rs @@ -23,7 +23,7 @@ mod tests { response::{ApiResponse, Status}, DesignerState, }, - output::TmanOutputCli, + output::cli::TmanOutputCli, }; #[derive(Serialize, Deserialize, Debug, Clone)] diff --git a/core/src/ten_manager/tests/test_case/designer/doc_link/mod.rs b/core/src/ten_manager/tests/test_case/designer/doc_link/mod.rs index 86d45a7e0..9bf5c95df 100644 --- a/core/src/ten_manager/tests/test_case/designer/doc_link/mod.rs +++ b/core/src/ten_manager/tests/test_case/designer/doc_link/mod.rs @@ -19,7 +19,7 @@ use ten_manager::{ response::ApiResponse, DesignerState, }, - output::TmanOutputCli, + output::cli::TmanOutputCli, }; #[actix_web::test] diff --git a/core/src/ten_manager/tests/test_case/designer/env/mod.rs b/core/src/ten_manager/tests/test_case/designer/env/mod.rs index af259c1d0..2dfcccdbe 100644 --- a/core/src/ten_manager/tests/test_case/designer/env/mod.rs +++ b/core/src/ten_manager/tests/test_case/designer/env/mod.rs @@ -16,7 +16,7 @@ mod tests { use ten_manager::{ config::{metadata::TmanMetadata, TmanConfig}, designer::{env::get_env_endpoint, DesignerState}, - output::TmanOutputCli, + output::cli::TmanOutputCli, }; #[actix_web::test] diff --git a/core/src/ten_manager/tests/test_case/designer/extensions/property/mod.rs b/core/src/ten_manager/tests/test_case/designer/extensions/property/mod.rs index d4fe1f501..8eae63525 100644 --- a/core/src/ten_manager/tests/test_case/designer/extensions/property/mod.rs +++ b/core/src/ten_manager/tests/test_case/designer/extensions/property/mod.rs @@ -20,7 +20,7 @@ mod tests { }; use ten_manager::designer::response::{ApiResponse, Status}; use ten_manager::designer::DesignerState; - use ten_manager::output::TmanOutputCli; + use ten_manager::output::cli::TmanOutputCli; use crate::test_case::common::mock::{ inject_all_pkgs_for_mock, inject_all_standard_pkgs_for_mock, diff --git a/core/src/ten_manager/tests/test_case/designer/extensions/schema/mod.rs b/core/src/ten_manager/tests/test_case/designer/extensions/schema/mod.rs index f7869cddd..afade6a92 100644 --- a/core/src/ten_manager/tests/test_case/designer/extensions/schema/mod.rs +++ b/core/src/ten_manager/tests/test_case/designer/extensions/schema/mod.rs @@ -21,7 +21,7 @@ mod tests { response::{ApiResponse, Status}, DesignerState, }, - output::TmanOutputCli, + output::cli::TmanOutputCli, }; use crate::test_case::common::mock::inject_all_standard_pkgs_for_mock; diff --git a/core/src/ten_manager/tests/test_case/designer/get_apps.rs b/core/src/ten_manager/tests/test_case/designer/get_apps.rs index b6b82c865..b99464394 100644 --- a/core/src/ten_manager/tests/test_case/designer/get_apps.rs +++ b/core/src/ten_manager/tests/test_case/designer/get_apps.rs @@ -16,7 +16,7 @@ mod tests { response::{ApiResponse, Status}, DesignerState, }, - output::TmanOutputCli, + output::cli::TmanOutputCli, pkg_info::get_all_pkgs::get_all_pkgs_in_app, }; diff --git a/core/src/ten_manager/tests/test_case/designer/get_graphs.rs b/core/src/ten_manager/tests/test_case/designer/get_graphs.rs index c599c4b41..7934c189a 100644 --- a/core/src/ten_manager/tests/test_case/designer/get_graphs.rs +++ b/core/src/ten_manager/tests/test_case/designer/get_graphs.rs @@ -25,7 +25,7 @@ use ten_manager::{ response::ApiResponse, DesignerState, }, - output::TmanOutputCli, + output::cli::TmanOutputCli, }; use crate::test_case::common::mock::inject_all_pkgs_for_mock; diff --git a/core/src/ten_manager/tests/test_case/designer/get_packages_scripts.rs b/core/src/ten_manager/tests/test_case/designer/get_packages_scripts.rs index 0fbc2768e..fbbab2229 100644 --- a/core/src/ten_manager/tests/test_case/designer/get_packages_scripts.rs +++ b/core/src/ten_manager/tests/test_case/designer/get_packages_scripts.rs @@ -20,7 +20,7 @@ mod tests { response::{ApiResponse, Status}, DesignerState, }, - output::TmanOutputCli, + output::cli::TmanOutputCli, pkg_info::get_all_pkgs::get_all_pkgs_in_app, }; diff --git a/core/src/ten_manager/tests/test_case/designer/graphs/connections/add.rs b/core/src/ten_manager/tests/test_case/designer/graphs/connections/add.rs index 9da066f1e..441cdb8a9 100644 --- a/core/src/ten_manager/tests/test_case/designer/graphs/connections/add.rs +++ b/core/src/ten_manager/tests/test_case/designer/graphs/connections/add.rs @@ -21,7 +21,7 @@ mod tests { DesignerState, }, graph::graphs_cache_find_by_name, - output::TmanOutputCli, + output::cli::TmanOutputCli, }; use ten_rust::pkg_info::{ constants::PROPERTY_JSON_FILENAME, message::MsgType, diff --git a/core/src/ten_manager/tests/test_case/designer/graphs/connections/delete.rs b/core/src/ten_manager/tests/test_case/designer/graphs/connections/delete.rs index 4ebfc68cc..e40951072 100644 --- a/core/src/ten_manager/tests/test_case/designer/graphs/connections/delete.rs +++ b/core/src/ten_manager/tests/test_case/designer/graphs/connections/delete.rs @@ -28,7 +28,7 @@ mod tests { DesignerState, }, graph::{graphs_cache_find_by_id, graphs_cache_find_by_name}, - output::TmanOutputCli, + output::cli::TmanOutputCli, }; use crate::test_case::common::mock::{ diff --git a/core/src/ten_manager/tests/test_case/designer/graphs/connections/get.rs b/core/src/ten_manager/tests/test_case/designer/graphs/connections/get.rs index d7e55c3f4..a83d8c8e0 100644 --- a/core/src/ten_manager/tests/test_case/designer/graphs/connections/get.rs +++ b/core/src/ten_manager/tests/test_case/designer/graphs/connections/get.rs @@ -24,7 +24,7 @@ mod tests { response::ApiResponse, DesignerState, }, - output::TmanOutputCli, + output::cli::TmanOutputCli, }; use crate::test_case::common::mock::{ diff --git a/core/src/ten_manager/tests/test_case/designer/graphs/connections/msg_conversion/add_with_msg_conversion.rs b/core/src/ten_manager/tests/test_case/designer/graphs/connections/msg_conversion/add_with_msg_conversion.rs index b659d614f..6d97a3113 100644 --- a/core/src/ten_manager/tests/test_case/designer/graphs/connections/msg_conversion/add_with_msg_conversion.rs +++ b/core/src/ten_manager/tests/test_case/designer/graphs/connections/msg_conversion/add_with_msg_conversion.rs @@ -21,7 +21,7 @@ mod tests { DesignerState, }, graph::graphs_cache_find_by_name, - output::TmanOutputCli, + output::cli::TmanOutputCli, }; use ten_rust::{ graph::msg_conversion::{ diff --git a/core/src/ten_manager/tests/test_case/designer/graphs/connections/msg_conversion/modify_with_msg_conversion.rs b/core/src/ten_manager/tests/test_case/designer/graphs/connections/msg_conversion/modify_with_msg_conversion.rs index 2a2e92288..a1b609325 100644 --- a/core/src/ten_manager/tests/test_case/designer/graphs/connections/msg_conversion/modify_with_msg_conversion.rs +++ b/core/src/ten_manager/tests/test_case/designer/graphs/connections/msg_conversion/modify_with_msg_conversion.rs @@ -27,7 +27,7 @@ mod tests { DesignerState, }, graph::graphs_cache_find_by_name, - output::TmanOutputCli, + output::cli::TmanOutputCli, }; use ten_rust::{ graph::msg_conversion::{ diff --git a/core/src/ten_manager/tests/test_case/designer/graphs/get.rs b/core/src/ten_manager/tests/test_case/designer/graphs/get.rs index ee2191d4a..43573dcf5 100644 --- a/core/src/ten_manager/tests/test_case/designer/graphs/get.rs +++ b/core/src/ten_manager/tests/test_case/designer/graphs/get.rs @@ -20,7 +20,7 @@ mod tests { response::ApiResponse, DesignerState, }, - output::TmanOutputCli, + output::cli::TmanOutputCli, }; use crate::test_case::common::mock::inject_all_standard_pkgs_for_mock; diff --git a/core/src/ten_manager/tests/test_case/designer/graphs/nodes/add.rs b/core/src/ten_manager/tests/test_case/designer/graphs/nodes/add.rs index e9e32b4b9..54edbba22 100644 --- a/core/src/ten_manager/tests/test_case/designer/graphs/nodes/add.rs +++ b/core/src/ten_manager/tests/test_case/designer/graphs/nodes/add.rs @@ -22,7 +22,7 @@ mod tests { DesignerState, }, graph::graphs_cache_find_by_name, - output::TmanOutputCli, + output::cli::TmanOutputCli, }; use ten_rust::pkg_info::{ constants::{MANIFEST_JSON_FILENAME, PROPERTY_JSON_FILENAME}, diff --git a/core/src/ten_manager/tests/test_case/designer/graphs/nodes/delete.rs b/core/src/ten_manager/tests/test_case/designer/graphs/nodes/delete.rs index 3319568ae..937d6a964 100644 --- a/core/src/ten_manager/tests/test_case/designer/graphs/nodes/delete.rs +++ b/core/src/ten_manager/tests/test_case/designer/graphs/nodes/delete.rs @@ -29,7 +29,7 @@ mod tests { DesignerState, }, graph::{graphs_cache_find_by_id, graphs_cache_find_by_name}, - output::TmanOutputCli, + output::cli::TmanOutputCli, }; use crate::test_case::common::mock::{ diff --git a/core/src/ten_manager/tests/test_case/designer/graphs/nodes/get.rs b/core/src/ten_manager/tests/test_case/designer/graphs/nodes/get.rs index e896aa933..213ca07bd 100644 --- a/core/src/ten_manager/tests/test_case/designer/graphs/nodes/get.rs +++ b/core/src/ten_manager/tests/test_case/designer/graphs/nodes/get.rs @@ -20,7 +20,7 @@ mod tests { response::{ApiResponse, ErrorResponse}, DesignerState, }, - output::TmanOutputCli, + output::cli::TmanOutputCli, }; use uuid::Uuid; diff --git a/core/src/ten_manager/tests/test_case/designer/graphs/nodes/property/modify.rs b/core/src/ten_manager/tests/test_case/designer/graphs/nodes/property/modify.rs index f58d8ab02..6ae3c698b 100644 --- a/core/src/ten_manager/tests/test_case/designer/graphs/nodes/property/modify.rs +++ b/core/src/ten_manager/tests/test_case/designer/graphs/nodes/property/modify.rs @@ -23,7 +23,7 @@ mod tests { DesignerState, }, graph::graphs_cache_find_by_name, - output::TmanOutputCli, + output::cli::TmanOutputCli, }; use ten_rust::pkg_info::constants::{ MANIFEST_JSON_FILENAME, PROPERTY_JSON_FILENAME, diff --git a/core/src/ten_manager/tests/test_case/designer/graphs/nodes/replace.rs b/core/src/ten_manager/tests/test_case/designer/graphs/nodes/replace.rs index c6c5eb250..4bec02982 100644 --- a/core/src/ten_manager/tests/test_case/designer/graphs/nodes/replace.rs +++ b/core/src/ten_manager/tests/test_case/designer/graphs/nodes/replace.rs @@ -21,7 +21,7 @@ mod tests { DesignerState, }, graph::graphs_cache_find_by_name, - output::TmanOutputCli, + output::cli::TmanOutputCli, }; use ten_rust::pkg_info::constants::{ MANIFEST_JSON_FILENAME, PROPERTY_JSON_FILENAME, diff --git a/core/src/ten_manager/tests/test_case/designer/graphs/update.rs b/core/src/ten_manager/tests/test_case/designer/graphs/update.rs index 9216264d6..d426d291b 100644 --- a/core/src/ten_manager/tests/test_case/designer/graphs/update.rs +++ b/core/src/ten_manager/tests/test_case/designer/graphs/update.rs @@ -20,7 +20,7 @@ mod tests { DesignerState, }, graph::graphs_cache_find_by_name, - output::TmanOutputCli, + output::cli::TmanOutputCli, }; use ten_rust::{ graph::{ diff --git a/core/src/ten_manager/tests/test_case/designer/help_text/mod.rs b/core/src/ten_manager/tests/test_case/designer/help_text/mod.rs index ab6c4a695..96cd44e1f 100644 --- a/core/src/ten_manager/tests/test_case/designer/help_text/mod.rs +++ b/core/src/ten_manager/tests/test_case/designer/help_text/mod.rs @@ -19,7 +19,7 @@ use ten_manager::{ response::ApiResponse, DesignerState, }, - output::TmanOutputCli, + output::cli::TmanOutputCli, }; #[actix_web::test] diff --git a/core/src/ten_manager/tests/test_case/designer/load_apps.rs b/core/src/ten_manager/tests/test_case/designer/load_apps.rs index 967add009..a6cea8bbd 100644 --- a/core/src/ten_manager/tests/test_case/designer/load_apps.rs +++ b/core/src/ten_manager/tests/test_case/designer/load_apps.rs @@ -18,7 +18,7 @@ mod tests { response::{ApiResponse, Status}, DesignerState, }, - output::TmanOutputCli, + output::cli::TmanOutputCli, }; #[actix_web::test] diff --git a/core/src/ten_manager/tests/test_case/designer/manifest/validate.rs b/core/src/ten_manager/tests/test_case/designer/manifest/validate.rs index 6208f9deb..b7e06521e 100644 --- a/core/src/ten_manager/tests/test_case/designer/manifest/validate.rs +++ b/core/src/ten_manager/tests/test_case/designer/manifest/validate.rs @@ -18,7 +18,7 @@ use ten_manager::{ response::{ApiResponse, Status}, DesignerState, }, - output::TmanOutputCli, + output::cli::TmanOutputCli, }; #[actix_rt::test] diff --git a/core/src/ten_manager/tests/test_case/designer/messages/compatible.rs b/core/src/ten_manager/tests/test_case/designer/messages/compatible.rs index 8900b5687..6a0a05e82 100644 --- a/core/src/ten_manager/tests/test_case/designer/messages/compatible.rs +++ b/core/src/ten_manager/tests/test_case/designer/messages/compatible.rs @@ -21,7 +21,7 @@ use ten_manager::{ response::ApiResponse, DesignerState, }, - output::TmanOutputCli, + output::cli::TmanOutputCli, }; use ten_rust::pkg_info::message::{MsgDirection, MsgType}; diff --git a/core/src/ten_manager/tests/test_case/designer/metadata/mod.rs b/core/src/ten_manager/tests/test_case/designer/metadata/mod.rs index 6dbaecb6e..1522a3a68 100644 --- a/core/src/ten_manager/tests/test_case/designer/metadata/mod.rs +++ b/core/src/ten_manager/tests/test_case/designer/metadata/mod.rs @@ -30,7 +30,7 @@ use ten_manager::{ response::ApiResponse, DesignerState, }, - output::TmanOutputCli, + output::cli::TmanOutputCli, }; #[actix_web::test] @@ -220,9 +220,7 @@ async fn test_update_graph_ui() { let designer_state = DesignerState { tman_config: Arc::new(tokio::sync::RwLock::new(TmanConfig::default())), - tman_metadata: Arc::new(tokio::sync::RwLock::new( - tman_metadata, - )), + tman_metadata: Arc::new(tokio::sync::RwLock::new(tman_metadata)), out: Arc::new(Box::new(TmanOutputCli)), pkgs_cache: tokio::sync::RwLock::new(HashMap::new()), graphs_cache: tokio::sync::RwLock::new(HashMap::new()), @@ -322,9 +320,7 @@ async fn test_get_nonexistent_graph_ui() { let designer_state = DesignerState { tman_config: Arc::new(tokio::sync::RwLock::new(TmanConfig::default())), - tman_metadata: Arc::new(tokio::sync::RwLock::new( - tman_metadata, - )), + tman_metadata: Arc::new(tokio::sync::RwLock::new(tman_metadata)), out: Arc::new(Box::new(TmanOutputCli)), pkgs_cache: tokio::sync::RwLock::new(HashMap::new()), graphs_cache: tokio::sync::RwLock::new(HashMap::new()), diff --git a/core/src/ten_manager/tests/test_case/designer/preferences/mod.rs b/core/src/ten_manager/tests/test_case/designer/preferences/mod.rs index 35daf8803..9f6309762 100644 --- a/core/src/ten_manager/tests/test_case/designer/preferences/mod.rs +++ b/core/src/ten_manager/tests/test_case/designer/preferences/mod.rs @@ -29,7 +29,7 @@ use ten_manager::{ }, DesignerState, }, - output::TmanOutputCli, + output::cli::TmanOutputCli, }; // Tests from get.rs diff --git a/core/src/ten_manager/tests/test_case/designer/property/validate.rs b/core/src/ten_manager/tests/test_case/designer/property/validate.rs index f8fa648cf..21aab07fc 100644 --- a/core/src/ten_manager/tests/test_case/designer/property/validate.rs +++ b/core/src/ten_manager/tests/test_case/designer/property/validate.rs @@ -17,7 +17,7 @@ mod tests { }, DesignerState, }, - output::TmanOutputCli, + output::cli::TmanOutputCli, }; #[actix_web::test] diff --git a/core/src/ten_manager/tests/test_case/designer/reload_apps.rs b/core/src/ten_manager/tests/test_case/designer/reload_apps.rs index 4f26bb2c9..63c565107 100644 --- a/core/src/ten_manager/tests/test_case/designer/reload_apps.rs +++ b/core/src/ten_manager/tests/test_case/designer/reload_apps.rs @@ -16,7 +16,7 @@ mod tests { response::{ApiResponse, Status}, DesignerState, }, - output::TmanOutputCli, + output::cli::TmanOutputCli, pkg_info::get_all_pkgs::get_all_pkgs_in_app, }; diff --git a/core/src/ten_manager/tests/test_case/designer/template_pkgs/mod.rs b/core/src/ten_manager/tests/test_case/designer/template_pkgs/mod.rs index de2ce1a6d..75de9dddd 100644 --- a/core/src/ten_manager/tests/test_case/designer/template_pkgs/mod.rs +++ b/core/src/ten_manager/tests/test_case/designer/template_pkgs/mod.rs @@ -19,7 +19,7 @@ use ten_manager::{ }, DesignerState, }, - output::TmanOutputCli, + output::cli::TmanOutputCli, }; use ten_rust::pkg_info::pkg_type::PkgType; diff --git a/core/src/ten_manager/tests/test_case/designer/version/mod.rs b/core/src/ten_manager/tests/test_case/designer/version/mod.rs index 0043b79b4..47b58ef7e 100644 --- a/core/src/ten_manager/tests/test_case/designer/version/mod.rs +++ b/core/src/ten_manager/tests/test_case/designer/version/mod.rs @@ -18,7 +18,7 @@ mod tests { designer::{ response::ApiResponse, version::get_version_endpoint, DesignerState, }, - output::TmanOutputCli, + output::cli::TmanOutputCli, version::VERSION, }; From ca4f01dd6169fb5ce2e224482057d318fb41b7be Mon Sep 17 00:00:00 2001 From: Hu Yueh-Wei Date: Sun, 20 Apr 2025 14:53:37 +0800 Subject: [PATCH 3/3] fix: refine codes --- .../src/designer/builtin_function/common.rs | 139 ++++++++++++++++++ .../src/designer/builtin_function/install.rs | 139 ++---------------- .../designer/builtin_function/install_all.rs | 134 ++--------------- .../src/designer/builtin_function/mod.rs | 1 + 4 files changed, 160 insertions(+), 253 deletions(-) create mode 100644 core/src/ten_manager/src/designer/builtin_function/common.rs diff --git a/core/src/ten_manager/src/designer/builtin_function/common.rs b/core/src/ten_manager/src/designer/builtin_function/common.rs new file mode 100644 index 000000000..aa4b7ca90 --- /dev/null +++ b/core/src/ten_manager/src/designer/builtin_function/common.rs @@ -0,0 +1,139 @@ +// +// Copyright © 2025 Agora +// This file is part of TEN Framework, an open source project. +// Licensed under the Apache License, Version 2.0, with certain conditions. +// Refer to the "LICENSE" file in the root directory for more information. +// +use std::sync::{mpsc, Arc}; +use std::thread; + +use actix::{fut, AsyncContext}; +use actix_web_actors::ws::WebsocketContext; + +use crate::cmd::cmd_install::InstallCommand; +use crate::config::metadata::TmanMetadata; +use crate::config::TmanConfig; +use crate::designer::builtin_function::{ + BuiltinFunctionOutput, WsBuiltinFunction, +}; +use crate::output::channel::TmanOutputChannel; +use crate::output::TmanOutput; + +pub fn run_installation( + tman_config: Arc>, + tman_metadata: Arc>, + install_command: InstallCommand, + ctx: &mut WebsocketContext, +) { + let addr = ctx.address(); + + // Create a channel for cross-thread communication. + let (sender, receiver) = mpsc::channel(); + + let output_channel = Arc::new(Box::new(TmanOutputChannel { + sender: sender.clone(), + }) as Box); + + // Run the installation process in a new thread. + thread::spawn(move || { + // Create a new Tokio runtime to execute asynchronous code. + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + + // Execute the installation in the new runtime. + let result = rt.block_on(async { + crate::cmd::cmd_install::execute_cmd( + tman_config, + tman_metadata, + install_command, + output_channel, + ) + .await + }); + + // Send the completion status. + let exit_code = if result.is_ok() { 0 } else { -1 }; + let error_message = if let Err(err) = result { + Some(err.to_string()) + } else { + None + }; + + let _ = sender.send(format!( + "EXIT:{}:{}", + exit_code, + error_message.unwrap_or_default() + )); + }); + + // Start a local task in the main thread to listen to the message channel. + let addr_clone = addr.clone(); + + // Use actix's fut::wrap_future to convert a standard Future to an + // ActorFuture. + ctx.spawn(fut::wrap_future::<_, WsBuiltinFunction>(async move { + // Use a loop to poll the receiver. + let mut continue_running = true; + + while continue_running { + match receiver.try_recv() { + Ok(msg) => { + if msg.starts_with("EXIT:") { + // Parse the exit status. + let parts: Vec<&str> = msg.splitn(3, ':').collect(); + if parts.len() >= 2 { + let exit_code = + parts[1].parse::().unwrap_or(-1); + let error_message = + if parts.len() > 2 && !parts[2].is_empty() { + Some(parts[2].to_string()) + } else { + None + }; + + // Send the exit message. + addr_clone.do_send(BuiltinFunctionOutput::Exit { + exit_code, + error_message, + }); + continue_running = false; + } + } else if msg.starts_with("normal_line:") { + // Parse and send normal logs. + let content = msg.replacen("normal_line:", "", 1); + addr_clone.do_send(BuiltinFunctionOutput::NormalLine( + content, + )); + } else if msg.starts_with("normal_partial:") { + // Parse and send normal partial logs. + let content = msg.replacen("normal_partial:", "", 1); + addr_clone.do_send( + BuiltinFunctionOutput::NormalPartial(content), + ); + } else if msg.starts_with("error_line:") { + // Parse and send error line logs. + let content = msg.replacen("error_line:", "", 1); + addr_clone + .do_send(BuiltinFunctionOutput::ErrorLine(content)); + } else if msg.starts_with("error_partial:") { + // Parse and send error partial logs. + let content = msg.replacen("error_partial:", "", 1); + addr_clone.do_send( + BuiltinFunctionOutput::ErrorPartial(content), + ); + } + } + Err(mpsc::TryRecvError::Empty) => { + // No message, temporarily yield control. + tokio::task::yield_now().await; + } + Err(mpsc::TryRecvError::Disconnected) => { + // The sender has disconnected, exit the loop. + continue_running = false; + } + } + } + })); +} diff --git a/core/src/ten_manager/src/designer/builtin_function/install.rs b/core/src/ten_manager/src/designer/builtin_function/install.rs index b3f4d1c2a..cfa2f4d1b 100644 --- a/core/src/ten_manager/src/designer/builtin_function/install.rs +++ b/core/src/ten_manager/src/designer/builtin_function/install.rs @@ -4,17 +4,13 @@ // Licensed under the Apache License, Version 2.0, with certain conditions. // Refer to the "LICENSE" file in the root directory for more information. // -use std::sync::{mpsc, Arc}; -use std::thread; - -use actix::{fut, AsyncContext}; use actix_web_actors::ws::WebsocketContext; use ten_rust::pkg_info::manifest::support::ManifestSupport; -use super::{BuiltinFunctionOutput, WsBuiltinFunction}; -use crate::output::channel::TmanOutputChannel; -use crate::output::TmanOutput; +use super::common::run_installation; +use super::WsBuiltinFunction; +use crate::cmd::cmd_install::LocalInstallMode; impl WsBuiltinFunction { pub fn install( @@ -38,132 +34,17 @@ impl WsBuiltinFunction { os: None, arch: None, }, - local_install_mode: crate::cmd::cmd_install::LocalInstallMode::Link, + local_install_mode: LocalInstallMode::Link, standalone: false, local_path: None, cwd: base_dir.clone(), }; - let addr = ctx.address(); - - // Create a channel for cross-thread communication. - let (sender, receiver) = mpsc::channel(); - - let output_channel = Arc::new(Box::new(TmanOutputChannel { - sender: sender.clone(), - }) as Box); - - // Clone the configuration required for installation. - let tman_config = self.tman_config.clone(); - let tman_metadata = self.tman_metadata.clone(); - - // Run the installation process in a new thread. - thread::spawn(move || { - // Create a new Tokio runtime to execute asynchronous code. - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(); - - // Execute the installation in the new runtime. - let result = rt.block_on(async { - crate::cmd::cmd_install::execute_cmd( - tman_config, - tman_metadata, - install_command, - output_channel, - ) - .await - }); - - // Send the completion status. - let exit_code = if result.is_ok() { 0 } else { -1 }; - let error_message = if let Err(err) = result { - Some(err.to_string()) - } else { - None - }; - - let _ = sender.send(format!( - "EXIT:{}:{}", - exit_code, - error_message.unwrap_or_default() - )); - }); - - // Start a local task in the main thread to listen to the message - // channel. - let addr_clone = addr.clone(); - - // Use actix's fut::wrap_future to convert a standard Future to an - // ActorFuture. - ctx.spawn(fut::wrap_future::<_, Self>(async move { - // Use a loop to poll the receiver. - let mut continue_running = true; - - while continue_running { - match receiver.try_recv() { - Ok(msg) => { - if msg.starts_with("EXIT:") { - // Parse the exit status. - let parts: Vec<&str> = msg.splitn(3, ':').collect(); - if parts.len() >= 2 { - let exit_code = - parts[1].parse::().unwrap_or(-1); - let error_message = if parts.len() > 2 - && !parts[2].is_empty() - { - Some(parts[2].to_string()) - } else { - None - }; - - // Send the exit message. - addr_clone.do_send( - BuiltinFunctionOutput::Exit { - exit_code, - error_message, - }, - ); - continue_running = false; - } - } else if msg.starts_with("normal_line:") { - // Parse and send normal logs. - let content = msg.replacen("normal_line:", "", 1); - addr_clone.do_send( - BuiltinFunctionOutput::NormalLine(content), - ); - } else if msg.starts_with("normal_partial:") { - // Parse and send normal partial logs. - let content = - msg.replacen("normal_partial:", "", 1); - addr_clone.do_send( - BuiltinFunctionOutput::NormalPartial(content), - ); - } else if msg.starts_with("error_line:") { - // Parse and send error line logs. - let content = msg.replacen("error_line:", "", 1); - addr_clone.do_send( - BuiltinFunctionOutput::ErrorLine(content), - ); - } else if msg.starts_with("error_partial:") { - // Parse and send error partial logs. - let content = msg.replacen("error_partial:", "", 1); - addr_clone.do_send( - BuiltinFunctionOutput::ErrorPartial(content), - ); - } - } - Err(mpsc::TryRecvError::Empty) => { - // No message, temporarily yield control. - tokio::task::yield_now().await; - } - Err(mpsc::TryRecvError::Disconnected) => { - // The sender has disconnected, exit the loop. - continue_running = false; - } - } - } - })); + run_installation( + self.tman_config.clone(), + self.tman_metadata.clone(), + install_command, + ctx, + ); } } diff --git a/core/src/ten_manager/src/designer/builtin_function/install_all.rs b/core/src/ten_manager/src/designer/builtin_function/install_all.rs index 1ed24cb6d..a0f5817fe 100644 --- a/core/src/ten_manager/src/designer/builtin_function/install_all.rs +++ b/core/src/ten_manager/src/designer/builtin_function/install_all.rs @@ -4,17 +4,13 @@ // Licensed under the Apache License, Version 2.0, with certain conditions. // Refer to the "LICENSE" file in the root directory for more information. // -use std::sync::{mpsc, Arc}; -use std::thread; - -use actix::{fut, AsyncContext}; use actix_web_actors::ws::WebsocketContext; use ten_rust::pkg_info::manifest::support::ManifestSupport; -use super::{BuiltinFunctionOutput, WsBuiltinFunction}; -use crate::output::channel::TmanOutputChannel; -use crate::output::TmanOutput; +use super::common::run_installation; +use super::WsBuiltinFunction; +use crate::cmd::cmd_install::LocalInstallMode; impl WsBuiltinFunction { pub fn install_all( @@ -29,127 +25,17 @@ impl WsBuiltinFunction { os: None, arch: None, }, - local_install_mode: crate::cmd::cmd_install::LocalInstallMode::Link, + local_install_mode: LocalInstallMode::Link, standalone: false, local_path: None, cwd: base_dir.clone(), }; - let addr = ctx.address(); - - // Create a channel for cross-thread communication. - let (sender, receiver) = mpsc::channel(); - - let output_channel = Arc::new(Box::new(TmanOutputChannel { - sender: sender.clone(), - }) as Box); - - // Clone the config and metadata before the async block. - let tman_config = self.tman_config.clone(); - let tman_metadata = self.tman_metadata.clone(); - - // Run the installation process in a new thread. - thread::spawn(move || { - // Create a new Tokio runtime to execute asynchronous code. - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(); - - // Execute the installation in the new runtime. - let result = rt.block_on(async { - crate::cmd::cmd_install::execute_cmd( - tman_config, - tman_metadata, - install_command, - output_channel, - ) - .await - }); - - // Send the completion status. - let exit_code = if result.is_ok() { 0 } else { -1 }; - let error_message = if let Err(err) = result { - Some(err.to_string()) - } else { - None - }; - let _ = sender.send(format!( - "EXIT:{}:{}", - exit_code, - error_message.unwrap_or_default() - )); - }); - - // Start a local task in the main thread to listen to the message - // channel. - let addr_clone = addr.clone(); - - // Use actix's fut::wrap_future to convert a standard Future to an - // ActorFuture. - ctx.spawn(fut::wrap_future::<_, Self>(async move { - // Use a loop to poll the receiver. - let mut continue_running = true; - while continue_running { - match receiver.try_recv() { - Ok(msg) => { - if msg.starts_with("EXIT:") { - // Parse the exit status. - let parts: Vec<&str> = msg.splitn(3, ':').collect(); - if parts.len() >= 2 { - let exit_code = - parts[1].parse::().unwrap_or(-1); - let error_message = if parts.len() > 2 - && !parts[2].is_empty() - { - Some(parts[2].to_string()) - } else { - None - }; - - // Send the exit message. - addr_clone.do_send( - BuiltinFunctionOutput::Exit { - exit_code, - error_message, - }, - ); - continue_running = false; - } - } else if msg.starts_with("normal_line:") { - // Parse and send normal logs. - let content = msg.replacen("normal_line:", "", 1); - addr_clone.do_send( - BuiltinFunctionOutput::NormalLine(content), - ); - } else if msg.starts_with("normal_partial:") { - let content = - msg.replacen("normal_partial:", "", 1); - addr_clone.do_send( - BuiltinFunctionOutput::NormalPartial(content), - ); - } else if msg.starts_with("error_line:") { - let content = msg.replacen("error_line:", "", 1); - addr_clone.do_send( - BuiltinFunctionOutput::ErrorLine(content), - ); - } else if msg.starts_with("error_partial:") { - let content = msg.replacen("error_partial:", "", 1); - addr_clone.do_send( - BuiltinFunctionOutput::ErrorPartial(content), - ); - } - } - Err(mpsc::TryRecvError::Empty) => { - // No message, temporarily yield control. - tokio::task::yield_now().await; - } - Err(mpsc::TryRecvError::Disconnected) => { - // The sender has disconnected, exit the loop. - continue_running = false; - } - } - } - })); + run_installation( + self.tman_config.clone(), + self.tman_metadata.clone(), + install_command, + ctx, + ); } } diff --git a/core/src/ten_manager/src/designer/builtin_function/mod.rs b/core/src/ten_manager/src/designer/builtin_function/mod.rs index 9734088a9..258aef739 100644 --- a/core/src/ten_manager/src/designer/builtin_function/mod.rs +++ b/core/src/ten_manager/src/designer/builtin_function/mod.rs @@ -4,6 +4,7 @@ // Licensed under the Apache License, Version 2.0, with certain conditions. // Refer to the "LICENSE" file in the root directory for more information. // +mod common; mod install; mod install_all; pub mod msg;