Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/ten_manager/src/cmd/cmd_designer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{
constants::DESIGNER_BACKEND_DEFAULT_PORT,
designer::{configure_routes, frontend::get_frontend_asset, DesignerState},
fs::{check_is_app_folder, get_cwd},
output::{TmanOutput, TmanOutputCli},
output::{cli::TmanOutputCli, TmanOutput},
pkg_info::get_all_pkgs::get_all_pkgs_in_app,
};

Expand Down
139 changes: 139 additions & 0 deletions core/src/ten_manager/src/designer/builtin_function/common.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
//
// Copyright © 2025 Agora
// This file is part of TEN Framework, an open source project.
// Licensed under the Apache License, Version 2.0, with certain conditions.
// Refer to the "LICENSE" file in the root directory for more information.
//
use std::sync::{mpsc, Arc};
use std::thread;

use actix::{fut, AsyncContext};
use actix_web_actors::ws::WebsocketContext;

use crate::cmd::cmd_install::InstallCommand;
use crate::config::metadata::TmanMetadata;
use crate::config::TmanConfig;
use crate::designer::builtin_function::{
BuiltinFunctionOutput, WsBuiltinFunction,
};
use crate::output::channel::TmanOutputChannel;
use crate::output::TmanOutput;

pub fn run_installation(
tman_config: Arc<tokio::sync::RwLock<TmanConfig>>,
tman_metadata: Arc<tokio::sync::RwLock<TmanMetadata>>,
install_command: InstallCommand,
ctx: &mut WebsocketContext<WsBuiltinFunction>,
) {
let addr = ctx.address();

// Create a channel for cross-thread communication.
let (sender, receiver) = mpsc::channel();

let output_channel = Arc::new(Box::new(TmanOutputChannel {
sender: sender.clone(),
}) as Box<dyn TmanOutput>);

// Run the installation process in a new thread.
thread::spawn(move || {
// Create a new Tokio runtime to execute asynchronous code.
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();

// Execute the installation in the new runtime.
let result = rt.block_on(async {
crate::cmd::cmd_install::execute_cmd(
tman_config,
tman_metadata,
install_command,
output_channel,
)
.await
});

// Send the completion status.
let exit_code = if result.is_ok() { 0 } else { -1 };
let error_message = if let Err(err) = result {
Some(err.to_string())
} else {
None
};

let _ = sender.send(format!(
"EXIT:{}:{}",
exit_code,
error_message.unwrap_or_default()
));
});

// Start a local task in the main thread to listen to the message channel.
let addr_clone = addr.clone();

// Use actix's fut::wrap_future to convert a standard Future to an
// ActorFuture.
ctx.spawn(fut::wrap_future::<_, WsBuiltinFunction>(async move {
// Use a loop to poll the receiver.
let mut continue_running = true;

while continue_running {
match receiver.try_recv() {
Ok(msg) => {
if msg.starts_with("EXIT:") {
// Parse the exit status.
let parts: Vec<&str> = msg.splitn(3, ':').collect();
if parts.len() >= 2 {
let exit_code =
parts[1].parse::<i32>().unwrap_or(-1);
let error_message =
if parts.len() > 2 && !parts[2].is_empty() {
Some(parts[2].to_string())
} else {
None
};

// Send the exit message.
addr_clone.do_send(BuiltinFunctionOutput::Exit {
exit_code,
error_message,
});
continue_running = false;
}
} else if msg.starts_with("normal_line:") {
// Parse and send normal logs.
let content = msg.replacen("normal_line:", "", 1);
addr_clone.do_send(BuiltinFunctionOutput::NormalLine(
content,
));
} else if msg.starts_with("normal_partial:") {
// Parse and send normal partial logs.
let content = msg.replacen("normal_partial:", "", 1);
addr_clone.do_send(
BuiltinFunctionOutput::NormalPartial(content),
);
} else if msg.starts_with("error_line:") {
// Parse and send error line logs.
let content = msg.replacen("error_line:", "", 1);
addr_clone
.do_send(BuiltinFunctionOutput::ErrorLine(content));
} else if msg.starts_with("error_partial:") {
// Parse and send error partial logs.
let content = msg.replacen("error_partial:", "", 1);
addr_clone.do_send(
BuiltinFunctionOutput::ErrorPartial(content),
);
}
}
Err(mpsc::TryRecvError::Empty) => {
// No message, temporarily yield control.
tokio::task::yield_now().await;
}
Err(mpsc::TryRecvError::Disconnected) => {
// The sender has disconnected, exit the loop.
continue_running = false;
}
}
}
}));
}
58 changes: 10 additions & 48 deletions core/src/ten_manager/src/designer/builtin_function/install.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,13 @@
// Licensed under the Apache License, Version 2.0, with certain conditions.
// Refer to the "LICENSE" file in the root directory for more information.
//
use std::sync::Arc;

use actix::AsyncContext;
use actix_web_actors::ws::WebsocketContext;
use tokio::task::LocalSet;

use ten_rust::pkg_info::manifest::support::ManifestSupport;

use super::{msg::TmanOutputWs, BuiltinFunctionOutput, WsBuiltinFunction};
use crate::output::TmanOutput;
use super::common::run_installation;
use super::WsBuiltinFunction;
use crate::cmd::cmd_install::LocalInstallMode;

impl WsBuiltinFunction {
pub fn install(
Expand All @@ -37,52 +34,17 @@ impl WsBuiltinFunction {
os: None,
arch: None,
},
local_install_mode: crate::cmd::cmd_install::LocalInstallMode::Link,
local_install_mode: LocalInstallMode::Link,
standalone: false,
local_path: None,
cwd: base_dir.clone(),
};

let addr = ctx.address();
let output_ws: Arc<Box<dyn TmanOutput>> =
Arc::new(Box::new(TmanOutputWs { addr: addr.clone() }));

let addr = addr.clone();

// Clone the tman_config to avoid borrowing self in the async task.
let tman_config = self.tman_config.clone();
let tman_metadata = self.tman_metadata.clone();

// Create a LocalSet to ensure spawn_local runs on this thread.
let local = LocalSet::new();

// Spawn the task within the LocalSet context.
local.spawn_local(async move {
let result = crate::cmd::cmd_install::execute_cmd(
tman_config,
tman_metadata,
install_command,
output_ws,
)
.await;

// Notify the WebSocket client that the task is complete, and
// determine the exit code based on the result.
let exit_code = if result.is_ok() { 0 } else { -1 };
let error_message = if let Err(err) = result {
Some(err.to_string())
} else {
None
};
addr.do_send(BuiltinFunctionOutput::Exit {
exit_code,
error_message,
});
});

// Use spawn to run the LocalSet in the background.
actix_web::rt::spawn(async move {
local.await;
});
run_installation(
self.tman_config.clone(),
self.tman_metadata.clone(),
install_command,
ctx,
);
}
}
58 changes: 10 additions & 48 deletions core/src/ten_manager/src/designer/builtin_function/install_all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,13 @@
// Licensed under the Apache License, Version 2.0, with certain conditions.
// Refer to the "LICENSE" file in the root directory for more information.
//
use std::sync::Arc;

use actix::AsyncContext;
use actix_web_actors::ws::WebsocketContext;
use tokio::task::LocalSet;

use ten_rust::pkg_info::manifest::support::ManifestSupport;

use super::{msg::TmanOutputWs, BuiltinFunctionOutput, WsBuiltinFunction};
use crate::output::TmanOutput;
use super::common::run_installation;
use super::WsBuiltinFunction;
use crate::cmd::cmd_install::LocalInstallMode;

impl WsBuiltinFunction {
pub fn install_all(
Expand All @@ -28,52 +25,17 @@ impl WsBuiltinFunction {
os: None,
arch: None,
},
local_install_mode: crate::cmd::cmd_install::LocalInstallMode::Link,
local_install_mode: LocalInstallMode::Link,
standalone: false,
local_path: None,
cwd: base_dir.clone(),
};

let addr = ctx.address();
let output_ws: Arc<Box<dyn TmanOutput>> =
Arc::new(Box::new(TmanOutputWs { addr: addr.clone() }));

let addr = addr.clone();

// Clone the config and metadata before the async block.
let tman_config = self.tman_config.clone();
let tman_metadata = self.tman_metadata.clone();

// Create a LocalSet to ensure spawn_local runs on this thread.
let local = LocalSet::new();

// Spawn the task within the LocalSet context.
local.spawn_local(async move {
// Now perform the actual work.
let result = crate::cmd::cmd_install::execute_cmd(
tman_config,
tman_metadata,
install_command,
output_ws,
)
.await;

// Notify the WebSocket client that the task is complete.
let exit_code = if result.is_ok() { 0 } else { -1 };
let error_message = if let Err(err) = result {
Some(err.to_string())
} else {
None
};
addr.do_send(BuiltinFunctionOutput::Exit {
exit_code,
error_message,
});
});

// Use spawn to run the LocalSet in the background.
actix_web::rt::spawn(async move {
local.await;
});
run_installation(
self.tman_config.clone(),
self.tman_metadata.clone(),
install_command,
ctx,
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
// Licensed under the Apache License, Version 2.0, with certain conditions.
// Refer to the "LICENSE" file in the root directory for more information.
//
mod common;
mod install;
mod install_all;
pub mod msg;
Expand Down
3 changes: 2 additions & 1 deletion core/src/ten_manager/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ use anyhow::Result;
use console::Emoji;
use ten_manager::cmd::execute_cmd;
use ten_manager::config::metadata::TmanMetadata;
use ten_manager::output::cli::TmanOutputCli;
use tokio::runtime::Runtime;

use ten_manager::cmd_line;
use ten_manager::constants::GITHUB_RELEASE_PAGE;
use ten_manager::output::{TmanOutput, TmanOutputCli};
use ten_manager::output::TmanOutput;
use ten_manager::version::VERSION;
use ten_manager::version_utils::check_update;

Expand Down
37 changes: 37 additions & 0 deletions core/src/ten_manager/src/output/channel.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
//
// Copyright © 2025 Agora
// This file is part of TEN Framework, an open source project.
// Licensed under the Apache License, Version 2.0, with certain conditions.
// Refer to the "LICENSE" file in the root directory for more information.
//
use std::sync::mpsc;

use super::TmanOutput;

// A TmanOutput implementation to send logs to the channel.
#[derive(Clone)]
pub struct TmanOutputChannel {
pub sender: mpsc::Sender<String>,
}

impl TmanOutput for TmanOutputChannel {
fn normal_line(&self, text: &str) {
let _ = self.sender.send(format!("normal_line:{}", text));
}

fn normal_partial(&self, text: &str) {
let _ = self.sender.send(format!("normal_partial:{}", text));
}

fn error_line(&self, text: &str) {
let _ = self.sender.send(format!("error_line:{}", text));
}

fn error_partial(&self, text: &str) {
let _ = self.sender.send(format!("error_partial:{}", text));
}

fn is_interactive(&self) -> bool {
false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,7 @@
// Licensed under the Apache License, Version 2.0, with certain conditions.
// Refer to the "LICENSE" file in the root directory for more information.
//

/// Abstract all log output methods: CLI, WebSocket, etc.
pub trait TmanOutput: Send + Sync {
/// General information.
fn normal_line(&self, text: &str);
fn normal_partial(&self, text: &str);

/// Error information.
fn error_line(&self, text: &str);
fn error_partial(&self, text: &str);

/// Whether it is interactive (e.g., can block waiting for user input in CLI
/// environment).
fn is_interactive(&self) -> bool;
}
use super::TmanOutput;

/// Output for CLI: directly println / eprintln.
pub struct TmanOutputCli;
Expand Down
Loading
Loading