Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion core/src/ten_manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,12 @@ strum_macros = "0.26"
tar = "0.4.43"
tempfile = { version = "3.10" }
ten_rust = { path = "../ten_rust", version = "0.1.0" }
tokio = { version = "1", features = ["rt-multi-thread", "sync", "time"] }
tokio = { version = "1", features = [
"rt-multi-thread",
"sync",
"time",
"process",
] }
tokio-macros = "2.5.0"
url = { version = "2.5" }
uuid = { version = "1.0" }
Expand Down
32 changes: 19 additions & 13 deletions core/src/ten_manager/src/cmd/cmd_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
// Refer to the "LICENSE" file in the root directory for more information.
//
use std::{
io::{BufRead, BufReader},
process::{Command as StdCommand, Stdio},
sync::Arc,
};
Expand All @@ -15,6 +14,7 @@ use clap::{Arg, ArgMatches, Command};
use ten_rust::{
fs::read_file_to_string, pkg_info::constants::MANIFEST_JSON_FILENAME,
};
use tokio::io::AsyncBufReadExt;

use crate::{
config::{is_verbose, metadata::TmanMetadata, TmanConfig},
Expand Down Expand Up @@ -213,40 +213,46 @@ pub async fn execute_cmd(
cmd_builder
};

// Get the standard output and standard error of the subprocess.
command_builder
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped());

let mut child = command_builder
// Spawn the subprocess.
let mut child = tokio::process::Command::from(command_builder)
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.map_err(|e| anyhow!("Failed to spawn subprocess: {}", e))?;

// Get stdout.
// Async reading of stdout.
if let Some(stdout) = child.stdout.take() {
let reader = BufReader::new(stdout);
let out = out.clone();
let mut reader = tokio::io::BufReader::new(stdout).lines();
let out_clone = out.clone();
tokio::spawn(async move {
for line in reader.lines().map_while(Result::ok) {
out.normal_line(&line);
while let Ok(Some(line)) = reader.next_line().await {
out_clone.normal_line(&line);
}
});
}

// Get stderr.
// Async reading of stderr.
if let Some(stderr) = child.stderr.take() {
let reader = BufReader::new(stderr);
let out = out.clone();
let mut reader_err = tokio::io::BufReader::new(stderr).lines();
let out_clone = out.clone();
tokio::spawn(async move {
for line in reader.lines().map_while(Result::ok) {
out.error_line(&line);
while let Ok(Some(line)) = reader_err.next_line().await {
out_clone.error_line(&line);
}
});
}

// Wait for the subprocess to exit.
// Await child exit asynchronously.
let status = child
.wait()
.await
.map_err(|e| anyhow!("Failed to wait for the subprocess: {}", e))?;

if !status.success() {
Expand Down
46 changes: 42 additions & 4 deletions core/src/ten_manager/src/designer/builtin_function/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,44 @@ pub fn run_installation(
}) as Box<dyn TmanOutput>);

// Run the installation process in a new thread.
//
// Since `cmd_install::execute_cmd()` is an operation that takes a long time
// (possibly several seconds or even minutes), although `execute_cmd()` is
// an async function and contains many await points, these await points
// may not necessarily enter the Poll::Pending state, making
// `execute_cmd()` likely to behave like a synchronous function call
// throughout its execution. In the actix-web architecture, each worker
// is a single-threaded structure, so when `execute_cmd()` blocks this
// single thread and cannot enter Pending due to await to yield control
// to the underlying event loop to handle tasks/messages, the websocket
// client messages (i.e., logs) sent during the `execute_cmd()`
// process will basically not be sent to the websocket client, and will only
// be sent all at once after `execute_cmd()` finishes. This results in a
// poor user experience, so the time-consuming `execute_cmd()` is executed
// in a new thread, where a simple async task can be used to receive
// messages/logs sent through the channel. After the new thread
// completes `execute_cmd()`, it sends the result back to the original
// thread through the channel, and the original thread then sends the result
// to the websocket client via websocket.
thread::spawn(move || {
// Create a new Tokio runtime to execute asynchronous code.
//
// Since in the whole execution flow of `cmd_install::execute_cmd()`
// does not use `tokio::spawn()` to start other tokio tasks, there is no
// need to use a multi-thread runtime. A single-thread runtime created
// using `new_current_thread` can be used.
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();

// Execute the installation in the new runtime.
//
// Because `block_on` will block the current _new_ thread, and the async
// block received by `block_on` will only execute on the current _new_
// thread, therefore, resources within `cmd_install::execute_cmd()` that
// do not have the `Send+Sync` trait, such as `clingo`, can safely run
// inside `block_on`.
let result = rt.block_on(async {
crate::cmd::cmd_install::execute_cmd(
tman_config,
Expand All @@ -53,7 +83,8 @@ pub fn run_installation(
.await
});

// Send the completion status.
// Send the completion status to the main thread (an actix worker
// thread).
let exit_code = if result.is_ok() { 0 } else { -1 };
let error_message = if let Err(err) = result {
Some(err.to_string())
Expand All @@ -68,11 +99,12 @@ pub fn run_installation(
));
});

// Start a local task in the main thread to listen to the message channel.
// Start a local task in the actix worker thread to listen to the message
// channel.
let addr_clone = addr.clone();

// Use actix's fut::wrap_future to convert a standard Future to an
// ActorFuture.
// Use actix's `fut::wrap_future` to convert a standard `Future` to an
// `ActorFuture`.
ctx.spawn(fut::wrap_future::<_, WsBuiltinFunction>(async move {
// Use a loop to poll the receiver.
let mut continue_running = true;
Expand All @@ -98,28 +130,34 @@ pub fn run_installation(
exit_code,
error_message,
});

// Exit the loop.
continue_running = false;
}
} else if msg.starts_with("normal_line:") {
// Parse and send normal logs.
let content = msg.replacen("normal_line:", "", 1);

addr_clone.do_send(BuiltinFunctionOutput::NormalLine(
content,
));
} else if msg.starts_with("normal_partial:") {
// Parse and send normal partial logs.
let content = msg.replacen("normal_partial:", "", 1);

addr_clone.do_send(
BuiltinFunctionOutput::NormalPartial(content),
);
} else if msg.starts_with("error_line:") {
// Parse and send error line logs.
let content = msg.replacen("error_line:", "", 1);

addr_clone
.do_send(BuiltinFunctionOutput::ErrorLine(content));
} else if msg.starts_with("error_partial:") {
// Parse and send error partial logs.
let content = msg.replacen("error_partial:", "", 1);

addr_clone.do_send(
BuiltinFunctionOutput::ErrorPartial(content),
);
Expand Down
80 changes: 0 additions & 80 deletions core/src/ten_rust/src/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,85 +129,6 @@ fn auto_gen_schema_bindings_from_c() {
}
}

fn auto_gen_value_bindings_from_c() {
let mut base_dir = env::current_dir()
.unwrap_or("Failed to get path of //ten_rust/src".into());
base_dir.pop();
base_dir.pop();

let mut value_header = base_dir.clone();
value_header
.push("include_internal/ten_utils/value/bindings/rust/value_proxy.h");
if !value_header.exists() {
println!("Path of value_proxy.h: {}", value_header.to_str().unwrap());
panic!("The //include_internal/ten_utils/value/bindings/rust/value_proxy.h does not exist.");
}

println!("cargo:rerun-if-changed={}", value_header.to_str().unwrap());

base_dir.push("include");

let binding_gen = bindgen::Builder::default()
.clang_arg(format!("-I{}", base_dir.to_str().unwrap()))
.no_copy("ten_value_t")
.header(value_header.to_str().unwrap())
.parse_callbacks(Box::new(bindgen::CargoCallbacks::new()))
.generate()
.expect("Unable to generate bindings");

let value_dir = Path::new("src/value/");
let generated_bindings = value_dir.join("bindings.rs");
let temp_bindings = value_dir.join(format!("bindings_{}.rs.tmp", id()));

binding_gen
.write_to_file(&temp_bindings)
.expect("Unable to write bindings into file.");

// Add some rules to the bindings file to disable clippy lints.
let bindings_content = fs::read_to_string(&temp_bindings)
.expect("Unable to read generated bindings");
let disabled_clippy_lints = [
"#![allow(non_upper_case_globals)]",
"#![allow(non_camel_case_types)]",
"#![allow(non_snake_case)]",
"#![allow(dead_code)]",
"#![allow(improper_ctypes)]",
"#![allow(improper_ctypes_definitions)]",
"#![allow(clippy::upper_case_acronyms)]",
];
let new_bindings_content =
disabled_clippy_lints.join("\n") + "\n\n" + &bindings_content;
fs::write(&temp_bindings, new_bindings_content)
.expect("Unable to add clippy lint rules to the generated bindings.");

let max_retries = 5;
// 500 milliseconds delay between retries.
let retry_delay = Duration::from_millis(500);

for attempt in 1..=max_retries {
// Atomically move the temporary file to the target file.
match fs::rename(&temp_bindings, &generated_bindings) {
Ok(_) => {
println!("File renamed successfully.");
break;
}
Err(e) if attempt < max_retries => {
println!(
"Attempt {}/{} failed: {}. Retrying...",
attempt, max_retries, e
);
thread::sleep(retry_delay);
}
Err(e) => {
panic!(
"Unable to move temporary bindings to final destination after {} attempts: {}",
max_retries, e
);
}
}
}
}

// The current auto-detection only supports limited environment combinations;
// for example, cross-compilation is not supported.
fn auto_detect_utils_library_path() -> PathBuf {
Expand Down Expand Up @@ -257,7 +178,6 @@ fn auto_detect_utils_library_path() -> PathBuf {

fn main() {
auto_gen_schema_bindings_from_c();
auto_gen_value_bindings_from_c();

// If the auto-detected utils library path is incorrect, we can specify it
// using the environment variable.
Expand Down
3 changes: 0 additions & 3 deletions core/src/ten_rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,3 @@ pub mod telemetry;
// declarations.
/// cbindgen:ignore
pub mod schema;

/// cbindgen:ignore
pub mod value;
Loading
Loading