Skip to content

Streaming output aligns with progress bar #39

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,14 +110,14 @@ impl CommandService {
let context = Arc::new(CommandRunContext {
child_process_factory: ChildProcessFactory::new(command_line_args),
command_metrics: CommandMetrics::default(),
progress,
progress: Arc::clone(&progress),
});
Self {
command_line_args,
command_path_cache: CommandPathCache::new(command_line_args),
command_semaphore: Arc::new(Semaphore::new(command_line_args.jobs)),
context,
output_writer: OutputWriter::new(command_line_args),
output_writer: OutputWriter::new(command_line_args, progress.progress_bar()),
}
}

Expand Down
17 changes: 13 additions & 4 deletions src/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@ mod task;

use anyhow::Context;

use indicatif::ProgressBar;

use tokio::{
sync::mpsc::{Sender, channel},
task::JoinHandle,
};

use tracing::{debug, warn};

use std::process::{ExitStatus, Output};
use std::{process::{ExitStatus, Output}, sync::Arc};

use crate::{
command_line_args::CommandLineArgs, common::OwnedCommandAndArgs, input::InputLineNumber,
Expand Down Expand Up @@ -59,15 +61,22 @@ pub struct OutputWriter {
}

impl OutputWriter {
pub fn new(command_line_args: &CommandLineArgs) -> Self {
pub fn new(
command_line_args: &CommandLineArgs,
progress_bar: Option<Arc<ProgressBar>>,
) -> Self {
let (sender, receiver) = channel(command_line_args.channel_capacity);
debug!(
"created output channel with capacity {}",
command_line_args.channel_capacity,
);

let output_task_join_handle =
tokio::spawn(task::OutputTask::new(receiver, command_line_args.keep_order).run());
let output_task_join_handle = tokio::spawn(task::OutputTask::new(
receiver,
command_line_args.keep_order,
progress_bar,
)
.run());

Self {
sender,
Expand Down
96 changes: 60 additions & 36 deletions src/output/task.rs
Original file line number Diff line number Diff line change
@@ -1,63 +1,87 @@
use tokio::{io::AsyncWrite, sync::mpsc::Receiver};
use indicatif::ProgressBar;

use tokio::{io::{self, AsyncWrite}, sync::mpsc::Receiver, task, runtime::Handle};

use tracing::{debug, error, instrument, trace};

use std::collections::BTreeMap;
use std::{collections::BTreeMap, sync::Arc};

use super::OutputMessage;

async fn process_message(output_message: OutputMessage, progress_bar: Option<Arc<ProgressBar>>) {
async fn copy(mut buffer: &[u8], output_stream: &mut (impl AsyncWrite + Unpin)) {
let result = io::copy(&mut buffer, &mut *output_stream).await;
trace!("copy result = {:?}", result);
}

task::spawn_blocking(move || {
let mut stdout_local = io::stdout();
let mut stderr_local = io::stderr();

if let Some(pb) = progress_bar.as_ref() {
pb.suspend(|| {
let rt = Handle::current();
if !output_message.stdout.is_empty() {
rt.block_on(copy(&output_message.stdout, &mut stdout_local));
}
if !output_message.stderr.is_empty() {
rt.block_on(copy(&output_message.stderr, &mut stderr_local));
}
});
} else {
let rt = Handle::current();
if !output_message.stdout.is_empty() {
rt.block_on(copy(&output_message.stdout, &mut stdout_local));
}
if !output_message.stderr.is_empty() {
rt.block_on(copy(&output_message.stderr, &mut stderr_local));
}
}

if !output_message.exit_status.success() {
error!(
"command failed: {},line={} exit_status={}",
output_message.command_and_args,
output_message.input_line_number,
output_message.exit_status.code().unwrap_or_default(),
);
}
})
.await
.expect("spawn_blocking failed");
}

pub struct OutputTask {
receiver: Receiver<OutputMessage>,
keep_order: bool,
progress_bar: Option<Arc<ProgressBar>>,
}

impl OutputTask {
pub fn new(receiver: Receiver<OutputMessage>, keep_order: bool) -> Self {
pub fn new(
receiver: Receiver<OutputMessage>,
keep_order: bool,
progress_bar: Option<Arc<ProgressBar>>,
) -> Self {
Self {
receiver,
keep_order,
progress_bar,
}
}

#[instrument(skip_all, name = "OutputTask::run", level = "debug")]
pub async fn run(self) {
debug!("begin run");

async fn copy(mut buffer: &[u8], output_stream: &mut (impl AsyncWrite + Unpin)) {
let result = tokio::io::copy(&mut buffer, &mut *output_stream).await;
trace!("copy result = {:?}", result);
}

async fn process_output_message(
output_message: OutputMessage,
stdout: &mut (impl AsyncWrite + Unpin),
stderr: &mut (impl AsyncWrite + Unpin),
) {
if !output_message.stdout.is_empty() {
copy(&output_message.stdout, stdout).await;
}
if !output_message.stderr.is_empty() {
copy(&output_message.stderr, stderr).await;
}
if !output_message.exit_status.success() {
error!(
"command failed: {},line={} exit_status={}",
output_message.command_and_args,
output_message.input_line_number,
output_message.exit_status.code().unwrap_or_default(),
);
}
}

let mut stdout = tokio::io::stdout();
let mut stderr = tokio::io::stderr();

let mut receiver = self.receiver;

let progress_bar = self.progress_bar;

if self.keep_order {
// When keep-order is enabled, buffer outputs and process them in order
let mut buffered_outputs: BTreeMap<usize, OutputMessage> = BTreeMap::new();
let mut next_line_number = 0;
let mut next_line_number = 1;

while let Some(output_message) = receiver.recv().await {
let line_number = output_message.input_line_number.line_number;
Expand All @@ -67,19 +91,19 @@ impl OutputTask {

// Process any buffered outputs that are ready (in order)
while let Some(output_message) = buffered_outputs.remove(&next_line_number) {
process_output_message(output_message, &mut stdout, &mut stderr).await;
process_message(output_message, progress_bar.clone()).await;
next_line_number += 1;
}
}

// Process any remaining buffered outputs
for (_, output_message) in buffered_outputs.into_iter() {
process_output_message(output_message, &mut stdout, &mut stderr).await;
process_message(output_message, progress_bar.clone()).await;
}
} else {
// When keep-order is disabled, process outputs as they arrive (original behavior)
while let Some(output_message) = receiver.recv().await {
process_output_message(output_message, &mut stdout, &mut stderr).await;
process_message(output_message, progress_bar.clone()).await;
}
}

Expand Down
8 changes: 6 additions & 2 deletions src/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::sync::Arc;
use crate::command_line_args::CommandLineArgs;

pub struct Progress {
progress_bar: Option<ProgressBar>,
progress_bar: Option<Arc<ProgressBar>>,
}

impl Progress {
Expand All @@ -19,7 +19,7 @@ impl Progress {
} else {
let style_info = style::choose_progress_style(command_line_args)?;

let progress_bar = ProgressBar::new(0);
let progress_bar = Arc::new(ProgressBar::new(0));
if style_info.enable_steady_tick {
progress_bar.enable_steady_tick(Duration::from_millis(100));
}
Expand Down Expand Up @@ -49,4 +49,8 @@ impl Progress {
progress_bar.finish();
}
}

pub fn progress_bar(&self) -> Option<Arc<ProgressBar>> {
self.progress_bar.as_ref().map(Arc::clone)
}
}