Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 50 additions & 2 deletions virt/arcbox-vm/src/bin/vm-agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ mod agent {
use std::collections::HashMap;
use std::io::{Read, Write};
use std::os::unix::io::RawFd;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::thread;

Expand Down Expand Up @@ -68,6 +69,13 @@ mod agent {

const MAX_FRAME_SIZE: usize = 16 * 1024 * 1024;

/// Maximum number of concurrently active connection-handling threads.
/// Prevents memory exhaustion during exec bursts (e.g. health checks).
const MAX_ACTIVE_CONNECTIONS: usize = 64;

/// Stack size for connection-handling threads (1 MB instead of the default 8 MB).
const THREAD_STACK_SIZE: usize = 1 << 20;

// -------------------------------------------------------------------------
// Protocol types
// -------------------------------------------------------------------------
Expand Down Expand Up @@ -751,6 +759,41 @@ mod agent {
}
}

/// Spawn a thread with a reduced stack size and a semaphore-style concurrency
/// limit. If the limit is already reached the connection fd is closed and a
/// warning is logged — this is preferable to OOM-killing the guest.
fn spawn_bounded(active: &Arc<AtomicUsize>, name: &str, conn_fd: RawFd, handler: fn(RawFd)) {
let current = active.fetch_add(1, Ordering::Relaxed);
if current >= MAX_ACTIVE_CONNECTIONS {
active.fetch_sub(1, Ordering::Relaxed);
eprintln!(
"agent: {name}: connection limit reached ({MAX_ACTIVE_CONNECTIONS}), dropping fd {conn_fd}"
);
// SAFETY: conn_fd is a valid, freshly accepted socket fd that nobody
// else owns yet.
unsafe { libc::close(conn_fd) };
return;
}

let active_clone = Arc::clone(active);
let thread_name = format!("{name}-{conn_fd}");
let builder = thread::Builder::new()
.name(thread_name)
.stack_size(THREAD_STACK_SIZE);

if let Err(e) = builder.spawn(move || {
handler(conn_fd);
active_clone.fetch_sub(1, Ordering::Relaxed);
Comment on lines +785 to +786
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Release active slot even if connection handler panics

The active-connection counter is decremented only after handler(conn_fd) returns normally, so any panic inside a handler leaks one slot permanently. This is realistic because the handler path still contains expect/unwrap panics (for example empty start.cmd in handle_piped), and after enough panics the counter reaches 64 and the agent starts dropping all new connections even though no threads are running. Please ensure slot release happens on unwind as well (e.g., guard with Drop or wrap handler execution).

Useful? React with 👍 / 👎.

}) {
// The closure was consumed but never executed — release the slot
// using the original reference and close the fd.
active.fetch_sub(1, Ordering::Relaxed);
eprintln!("agent: {name}: failed to spawn thread: {e}");
// SAFETY: conn_fd is still valid; the closure never ran.
unsafe { libc::close(conn_fd) };
}
}

pub fn run() {
mount_filesystems();
setup_dns();
Expand All @@ -760,18 +803,23 @@ mod agent {
let exec_fd = create_vsock_listener(AGENT_PORT);
let file_fd = create_vsock_listener(FILE_PORT);

// Shared counters for bounding active connection threads.
let file_active: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
let exec_active: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));

// File I/O listener thread.
let file_active_clone = Arc::clone(&file_active);
thread::spawn(move || {
loop {
let conn_fd = accept_connection(file_fd);
thread::spawn(move || handle_file_connection(conn_fd));
spawn_bounded(&file_active_clone, "file", conn_fd, handle_file_connection);
}
});

// Exec listener (main thread).
loop {
let conn_fd = accept_connection(exec_fd);
thread::spawn(move || handle_connection(conn_fd));
spawn_bounded(&exec_active, "exec", conn_fd, handle_connection);
}
}

Expand Down
Loading