From 38b5c06065364047c82c11150f3d50b627938121 Mon Sep 17 00:00:00 2001 From: dreampuf Date: Mon, 23 Jun 2025 16:33:38 +0800 Subject: [PATCH 1/2] fix the streaming output in `--keep-order` --- src/output/task.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/output/task.rs b/src/output/task.rs index 3584fa5..305d1fe 100644 --- a/src/output/task.rs +++ b/src/output/task.rs @@ -57,7 +57,7 @@ impl OutputTask { if self.keep_order { // When keep-order is enabled, buffer outputs and process them in order let mut buffered_outputs: BTreeMap = BTreeMap::new(); - let mut next_line_number = 0; + let mut next_line_number = 1; while let Some(output_message) = receiver.recv().await { let line_number = output_message.input_line_number.line_number; From 92cde298f5fa4825ae96f91cb42c64967900dfe7 Mon Sep 17 00:00:00 2001 From: dreampuf Date: Sat, 28 Jun 2025 01:01:50 +0800 Subject: [PATCH 2/2] moving stdout and stderr into pb.suspend --- src/command.rs | 4 +- src/output.rs | 17 +++++++-- src/output/task.rs | 94 +++++++++++++++++++++++++++++----------------- src/progress.rs | 8 +++- 4 files changed, 80 insertions(+), 43 deletions(-) diff --git a/src/command.rs b/src/command.rs index 1e831ec..5091f6d 100644 --- a/src/command.rs +++ b/src/command.rs @@ -110,14 +110,14 @@ impl CommandService { let context = Arc::new(CommandRunContext { child_process_factory: ChildProcessFactory::new(command_line_args), command_metrics: CommandMetrics::default(), - progress, + progress: Arc::clone(&progress), }); Self { command_line_args, command_path_cache: CommandPathCache::new(command_line_args), command_semaphore: Arc::new(Semaphore::new(command_line_args.jobs)), context, - output_writer: OutputWriter::new(command_line_args), + output_writer: OutputWriter::new(command_line_args, progress.progress_bar()), } } diff --git a/src/output.rs b/src/output.rs index 1a0f582..b45077f 100644 --- a/src/output.rs +++ b/src/output.rs @@ -2,6 +2,8 @@ mod task; use anyhow::Context; +use indicatif::ProgressBar; + use tokio::{ sync::mpsc::{Sender, channel}, task::JoinHandle, @@ -9,7 +11,7 @@ use tokio::{ use tracing::{debug, warn}; -use std::process::{ExitStatus, Output}; +use std::{process::{ExitStatus, Output}, sync::Arc}; use crate::{ command_line_args::CommandLineArgs, common::OwnedCommandAndArgs, input::InputLineNumber, @@ -59,15 +61,22 @@ pub struct OutputWriter { } impl OutputWriter { - pub fn new(command_line_args: &CommandLineArgs) -> Self { + pub fn new( + command_line_args: &CommandLineArgs, + progress_bar: Option>, + ) -> Self { let (sender, receiver) = channel(command_line_args.channel_capacity); debug!( "created output channel with capacity {}", command_line_args.channel_capacity, ); - let output_task_join_handle = - tokio::spawn(task::OutputTask::new(receiver, command_line_args.keep_order).run()); + let output_task_join_handle = tokio::spawn(task::OutputTask::new( + receiver, + command_line_args.keep_order, + progress_bar, + ) + .run()); Self { sender, diff --git a/src/output/task.rs b/src/output/task.rs index 305d1fe..42f6871 100644 --- a/src/output/task.rs +++ b/src/output/task.rs @@ -1,21 +1,72 @@ -use tokio::{io::AsyncWrite, sync::mpsc::Receiver}; +use indicatif::ProgressBar; + +use tokio::{io::{self, AsyncWrite}, sync::mpsc::Receiver, task, runtime::Handle}; use tracing::{debug, error, instrument, trace}; -use std::collections::BTreeMap; +use std::{collections::BTreeMap, sync::Arc}; use super::OutputMessage; +async fn process_message(output_message: OutputMessage, progress_bar: Option>) { + async fn copy(mut buffer: &[u8], output_stream: &mut (impl AsyncWrite + Unpin)) { + let result = io::copy(&mut buffer, &mut *output_stream).await; + trace!("copy result = {:?}", result); + } + + task::spawn_blocking(move || { + let mut stdout_local = io::stdout(); + let mut stderr_local = io::stderr(); + + if let Some(pb) = progress_bar.as_ref() { + pb.suspend(|| { + let rt = Handle::current(); + if !output_message.stdout.is_empty() { + rt.block_on(copy(&output_message.stdout, &mut stdout_local)); + } + if !output_message.stderr.is_empty() { + rt.block_on(copy(&output_message.stderr, &mut stderr_local)); + } + }); + } else { + let rt = Handle::current(); + if !output_message.stdout.is_empty() { + rt.block_on(copy(&output_message.stdout, &mut stdout_local)); + } + if !output_message.stderr.is_empty() { + rt.block_on(copy(&output_message.stderr, &mut stderr_local)); + } + } + + if !output_message.exit_status.success() { + error!( + "command failed: {},line={} exit_status={}", + output_message.command_and_args, + output_message.input_line_number, + output_message.exit_status.code().unwrap_or_default(), + ); + } + }) + .await + .expect("spawn_blocking failed"); +} + pub struct OutputTask { receiver: Receiver, keep_order: bool, + progress_bar: Option>, } impl OutputTask { - pub fn new(receiver: Receiver, keep_order: bool) -> Self { + pub fn new( + receiver: Receiver, + keep_order: bool, + progress_bar: Option>, + ) -> Self { Self { receiver, keep_order, + progress_bar, } } @@ -23,37 +74,10 @@ impl OutputTask { pub async fn run(self) { debug!("begin run"); - async fn copy(mut buffer: &[u8], output_stream: &mut (impl AsyncWrite + Unpin)) { - let result = tokio::io::copy(&mut buffer, &mut *output_stream).await; - trace!("copy result = {:?}", result); - } - - async fn process_output_message( - output_message: OutputMessage, - stdout: &mut (impl AsyncWrite + Unpin), - stderr: &mut (impl AsyncWrite + Unpin), - ) { - if !output_message.stdout.is_empty() { - copy(&output_message.stdout, stdout).await; - } - if !output_message.stderr.is_empty() { - copy(&output_message.stderr, stderr).await; - } - if !output_message.exit_status.success() { - error!( - "command failed: {},line={} exit_status={}", - output_message.command_and_args, - output_message.input_line_number, - output_message.exit_status.code().unwrap_or_default(), - ); - } - } - - let mut stdout = tokio::io::stdout(); - let mut stderr = tokio::io::stderr(); - let mut receiver = self.receiver; + let progress_bar = self.progress_bar; + if self.keep_order { // When keep-order is enabled, buffer outputs and process them in order let mut buffered_outputs: BTreeMap = BTreeMap::new(); @@ -67,19 +91,19 @@ impl OutputTask { // Process any buffered outputs that are ready (in order) while let Some(output_message) = buffered_outputs.remove(&next_line_number) { - process_output_message(output_message, &mut stdout, &mut stderr).await; + process_message(output_message, progress_bar.clone()).await; next_line_number += 1; } } // Process any remaining buffered outputs for (_, output_message) in buffered_outputs.into_iter() { - process_output_message(output_message, &mut stdout, &mut stderr).await; + process_message(output_message, progress_bar.clone()).await; } } else { // When keep-order is disabled, process outputs as they arrive (original behavior) while let Some(output_message) = receiver.recv().await { - process_output_message(output_message, &mut stdout, &mut stderr).await; + process_message(output_message, progress_bar.clone()).await; } } diff --git a/src/progress.rs b/src/progress.rs index 92e2988..4fae722 100644 --- a/src/progress.rs +++ b/src/progress.rs @@ -9,7 +9,7 @@ use std::sync::Arc; use crate::command_line_args::CommandLineArgs; pub struct Progress { - progress_bar: Option, + progress_bar: Option>, } impl Progress { @@ -19,7 +19,7 @@ impl Progress { } else { let style_info = style::choose_progress_style(command_line_args)?; - let progress_bar = ProgressBar::new(0); + let progress_bar = Arc::new(ProgressBar::new(0)); if style_info.enable_steady_tick { progress_bar.enable_steady_tick(Duration::from_millis(100)); } @@ -49,4 +49,8 @@ impl Progress { progress_bar.finish(); } } + + pub fn progress_bar(&self) -> Option> { + self.progress_bar.as_ref().map(Arc::clone) + } }