From 03d59520185859554c0f92e721641da5647cc07a Mon Sep 17 00:00:00 2001 From: Usama Saqib Date: Tue, 3 Mar 2026 09:56:52 +0100 Subject: [PATCH 1/7] neper --- Dockerfile | 18 ++- lading/src/bin/payloadtool.rs | 7 + lading/src/blackhole.rs | 12 ++ lading/src/blackhole/neper.rs | 139 +++++++++++++++++ lading/src/generator.rs | 12 ++ lading/src/generator/neper.rs | 278 ++++++++++++++++++++++++++++++++++ 6 files changed, 465 insertions(+), 1 deletion(-) create mode 100644 lading/src/blackhole/neper.rs create mode 100644 lading/src/generator/neper.rs diff --git a/Dockerfile b/Dockerfile index 54893cefb..149507731 100644 --- a/Dockerfile +++ b/Dockerfile @@ -77,13 +77,29 @@ RUN --mount=type=secret,id=aws_access_key_id \ export RUSTC_WRAPPER=sccache && \ cargo build --release --locked --bin lading --features logrotate_fs -# Stage 3: Runtime +# Stage 3: Build neper binaries +FROM docker.io/debian:bookworm-20241202-slim AS neper-builder +RUN apt-get update && apt-get install -y \ + git \ + build-essential \ + libsctp-dev \ + && rm -rf /var/lib/apt/lists/* +RUN git clone https://github.com/google/neper.git /tmp/neper \ + && cd /tmp/neper \ + && make \ + && cp tcp_rr tcp_crr tcp_stream /usr/local/bin/ \ + && rm -rf /tmp/neper + +# Stage 4: Runtime FROM docker.io/debian:bookworm-20241202-slim RUN apt-get update && apt-get install -y \ libfuse3-dev=3.14.0-4 \ fuse3=3.14.0-4 \ && rm -rf /var/lib/apt/lists/* COPY --from=builder /app/target/release/lading /usr/bin/lading +COPY --from=neper-builder /usr/local/bin/tcp_rr /usr/local/bin/tcp_rr +COPY --from=neper-builder /usr/local/bin/tcp_crr /usr/local/bin/tcp_crr +COPY --from=neper-builder /usr/local/bin/tcp_stream /usr/local/bin/tcp_stream # Smoke test RUN ["/usr/bin/lading", "--help"] diff --git a/lading/src/bin/payloadtool.rs b/lading/src/bin/payloadtool.rs index 78abb18e6..16b949c20 100644 --- a/lading/src/bin/payloadtool.rs +++ b/lading/src/bin/payloadtool.rs @@ -508,6 +508,13 @@ fn check_generator( compute_fingerprint, ) } + generator::Inner::Neper(_) => { + if compute_fingerprint { + warn!("Neper not supported for fingerprinting"); + return Ok(None); + } + unimplemented!("Neper not supported") + } } } diff --git a/lading/src/blackhole.rs b/lading/src/blackhole.rs index b3387a600..dd2dc993f 100644 --- a/lading/src/blackhole.rs +++ b/lading/src/blackhole.rs @@ -18,6 +18,7 @@ pub mod tcp; pub mod udp; pub mod unix_datagram; pub mod unix_stream; +pub mod neper; #[derive(thiserror::Error, Debug)] /// Errors produced by [`Server`]. @@ -52,6 +53,9 @@ pub enum Error { /// See [`crate::blackhole::otlp::Error`] for details. #[error(transparent)] Otlp(otlp::Error), + /// See [`crate::blackhole::neper::Error`] for details. + #[error(transparent)] + Neper(neper::Error), } #[derive(Debug, Deserialize, Serialize, Clone, PartialEq, Eq)] @@ -101,6 +105,8 @@ pub enum Inner { Sqs(sqs::Config), /// See [`crate::blackhole::otlp::Config`] for details. Otlp(otlp::Config), + /// See [`crate::blackhole::neper::Config`] for details. + Neper(neper::Config), } #[derive(Debug)] @@ -129,6 +135,8 @@ pub enum Server { Sqs(sqs::Sqs), /// See [`crate::blackhole::otlp::Otlp`] for details. Otlp(otlp::Otlp), + /// See [`crate::blackhole::neper::Neper`] for details. + Neper(neper::Neper), } impl Server { @@ -169,6 +177,9 @@ impl Server { Inner::Otlp(conf) => { Self::Otlp(otlp::Otlp::new(&config.general, &conf, &shutdown).map_err(Error::Otlp)?) } + Inner::Neper(conf) => { + Self::Neper(neper::Neper::new(config.general, &conf, shutdown)) + } }; Ok(server) } @@ -196,6 +207,7 @@ impl Server { Server::Sqs(inner) => inner.run().await.map_err(Error::Sqs), Server::SplunkHec(inner) => inner.run().await.map_err(Error::SplunkHec), Server::Otlp(inner) => inner.run().await.map_err(Error::Otlp), + Server::Neper(inner) => inner.run().await.map_err(Error::Neper), } } } diff --git a/lading/src/blackhole/neper.rs b/lading/src/blackhole/neper.rs new file mode 100644 index 000000000..9b1d7da4e --- /dev/null +++ b/lading/src/blackhole/neper.rs @@ -0,0 +1,139 @@ +//! The neper network performance blackhole. +//! +//! This blackhole spawns a [neper](https://github.com/google/neper) server +//! process that listens for incoming connections from a neper client. The +//! server is kept alive until a shutdown signal is received. + +use std::{io, path::PathBuf, process::Stdio}; + +use serde::{Deserialize, Serialize}; +use tokio::process::Command; +use tracing::{info, warn}; + +use super::General; +use crate::generator::neper::Workload; + +/// Directory where neper binaries are installed. +const NEPER_BIN_DIR: &str = "/usr/local/bin"; + +#[derive(Debug, Deserialize, Serialize, Clone, PartialEq, Eq)] +#[serde(deny_unknown_fields)] +/// Configuration for the neper blackhole. +pub struct Config { + /// The workload to serve. + pub workload: Workload, + /// Optional control port override (`-C`). + pub control_port: Option, + /// Optional data port override (`-P`). + pub data_port: Option, + /// Extra CLI arguments forwarded verbatim to the neper binary. + #[serde(default)] + pub extra_args: Vec, +} + +#[derive(thiserror::Error, Debug)] +/// Errors produced by [`Neper`]. +pub enum Error { + /// IO error + #[error(transparent)] + Io(#[from] io::Error), + /// Neper process exited with a non-zero status. + #[error("neper server exited with {status}: {stderr}")] + NeperFailed { + /// Exit status + status: std::process::ExitStatus, + /// Captured stderr + stderr: String, + }, +} + +#[derive(Debug)] +/// The neper blackhole. +/// +/// Spawns a neper server binary and keeps it running until shutdown. +pub struct Neper { + config: Config, + shutdown: lading_signal::Watcher, + metric_labels: Vec<(String, String)>, +} + +impl Neper { + /// Create a new [`Neper`] blackhole instance. + #[must_use] + pub fn new(general: General, config: &Config, shutdown: lading_signal::Watcher) -> Self { + let mut metric_labels = vec![ + ("component".to_string(), "blackhole".to_string()), + ("component_name".to_string(), "neper".to_string()), + ]; + if let Some(id) = general.id { + metric_labels.push(("id".to_string(), id)); + } + + Self { + config: config.clone(), + shutdown, + metric_labels, + } + } + + /// Run the neper server until shutdown. + /// + /// # Errors + /// + /// Returns an error if spawning neper fails or it exits with a non-zero + /// status before shutdown. + pub async fn run(self) -> Result<(), Error> { + let _ = &self.metric_labels; // reserved for future metrics + + let binary = PathBuf::from(NEPER_BIN_DIR).join(self.config.workload.binary_name()); + + let mut cmd = Command::new(&binary); + + if let Some(port) = self.config.control_port { + cmd.arg("-C").arg(port.to_string()); + } + if let Some(port) = self.config.data_port { + cmd.arg("-P").arg(port.to_string()); + } + for arg in &self.config.extra_args { + cmd.arg(arg); + } + + cmd.stdout(Stdio::null()); + cmd.stderr(Stdio::piped()); + cmd.kill_on_drop(true); + + info!(?binary, "spawning neper server"); + let mut child = cmd.spawn()?; + + let shutdown_wait = self.shutdown.recv(); + tokio::pin!(shutdown_wait); + + tokio::select! { + result = child.wait() => { + let status = result?; + if !status.success() { + let stderr = match child.stderr.take() { + Some(mut se) => { + use tokio::io::AsyncReadExt; + let mut buf = String::new(); + let _ = se.read_to_string(&mut buf).await; + buf + } + None => String::new(), + }; + warn!(%status, "neper server exited unexpectedly"); + return Err(Error::NeperFailed { status, stderr }); + } + // Server exited cleanly (e.g. after the client disconnected). + warn!("neper server exited on its own"); + } + () = &mut shutdown_wait => { + info!("shutdown signal received, killing neper server"); + let _ = child.kill().await; + } + } + + Ok(()) + } +} diff --git a/lading/src/generator.rs b/lading/src/generator.rs index 1140542ab..4eadce790 100644 --- a/lading/src/generator.rs +++ b/lading/src/generator.rs @@ -31,6 +31,7 @@ pub mod trace_agent; pub mod udp; pub mod unix_datagram; pub mod unix_stream; +pub mod neper; #[derive(thiserror::Error, Debug)] /// Errors produced by [`Server`]. @@ -80,6 +81,9 @@ pub enum Error { /// See [`crate::generator::trace_agent::Error`] for details. #[error(transparent)] TraceAgent(#[from] trace_agent::Error), + /// See [`crate::generator::neper::Error`] for details. + #[error(transparent)] + Neper(#[from] neper::Error), } #[derive(Debug, Deserialize, Serialize, PartialEq, Clone)] @@ -139,6 +143,8 @@ pub enum Inner { Kubernetes(kubernetes::Config), /// See [`crate::generator::trace_agent::Config`] for details. TraceAgent(trace_agent::Config), + /// See [`crate::generator::neper::Config`] for details. + Neper(neper::Config), } #[derive(Debug)] @@ -178,6 +184,8 @@ pub enum Server { Kubernetes(kubernetes::Kubernetes), /// See [`crate::generator::trace_agent::TraceAgent`] for details. TraceAgent(trace_agent::TraceAgent), + /// See [`crate::generator::neper::Neper`] for details. + Neper(neper::Neper), } impl Server { @@ -245,6 +253,9 @@ impl Server { &conf, shutdown, )?), + Inner::Neper(conf) => { + Self::Neper(neper::Neper::new(config.general, &conf, shutdown)?) + } }; Ok(srv) } @@ -281,6 +292,7 @@ impl Server { Server::Container(inner) => inner.spin().await?, Server::Kubernetes(inner) => inner.spin().await?, Server::TraceAgent(inner) => inner.spin().await?, + Server::Neper(inner) => inner.spin().await?, } Ok(()) diff --git a/lading/src/generator/neper.rs b/lading/src/generator/neper.rs new file mode 100644 index 000000000..7b2b9f8c0 --- /dev/null +++ b/lading/src/generator/neper.rs @@ -0,0 +1,278 @@ +//! The neper network performance generator. +//! +//! This generator spawns a [neper](https://github.com/google/neper) client +//! process to drive TCP workloads (request-response, connection-per-request, +//! streaming) against a neper server and reports the resulting throughput as a +//! metric. +//! +//! ## Metrics +//! +//! `neper_throughput`: Throughput value reported by neper at the end of the run + +use std::{io, path::PathBuf, process::Stdio}; + +use metrics::gauge; +use serde::{Deserialize, Serialize}; +use tokio::process::Command; +use tracing::{info, warn}; + +use super::General; +use crate::generator::common::MetricsBuilder; + +/// Directory where neper binaries are installed. +const NEPER_BIN_DIR: &str = "/usr/local/bin"; + +fn default_startup_delay_seconds() -> u64 { + 5 +} + +/// Neper workload type. +#[derive(Debug, Deserialize, Serialize, PartialEq, Eq, Clone, Copy)] +#[serde(rename_all = "snake_case")] +pub enum Workload { + /// TCP request-response + TcpRr, + /// TCP connect/request/response (new connection per request) + TcpCrr, + /// TCP streaming + TcpStream, +} + +impl Workload { + /// Return the binary name for this workload. + #[must_use] + pub fn binary_name(self) -> &'static str { + match self { + Workload::TcpRr => "tcp_rr", + Workload::TcpCrr => "tcp_crr", + Workload::TcpStream => "tcp_stream", + } + } + + /// Return the key in neper's output that carries the throughput value. + fn throughput_key(self) -> &'static str { + match self { + Workload::TcpRr | Workload::TcpCrr | Workload::TcpStream => "throughput", + } + } +} + +#[derive(Debug, Deserialize, Serialize, PartialEq, Clone)] +#[serde(deny_unknown_fields)] +/// Configuration for the neper generator. +pub struct Config { + /// The workload to run. + pub workload: Workload, + /// The host (or IP) of the neper server. + pub host: String, + /// Optional control port override (`-C`). + pub control_port: Option, + /// Optional data port override (`-P`). + pub data_port: Option, + /// Duration in seconds for the neper run (`-l`). + pub duration_seconds: u64, + /// Seconds to wait before spawning neper (let the server start). + #[serde(default = "default_startup_delay_seconds")] + pub startup_delay_seconds: u64, + /// Extra CLI arguments forwarded verbatim to the neper binary. + #[serde(default)] + pub extra_args: Vec, +} + +#[derive(thiserror::Error, Debug)] +/// Errors produced by [`Neper`]. +pub enum Error { + /// IO error + #[error(transparent)] + Io(#[from] io::Error), + /// Neper process exited with a non-zero status. + #[error("neper exited with {status}: {stderr}")] + NeperFailed { + /// Exit status + status: std::process::ExitStatus, + /// Captured stderr + stderr: String, + }, + /// Could not parse throughput from neper output. + #[error("failed to parse neper output: {0}")] + OutputParse(String), +} + +#[derive(Debug)] +/// The neper generator. +/// +/// Spawns a neper client binary, waits for it to finish, and emits the +/// throughput value as a gauge metric. +pub struct Neper { + config: Config, + shutdown: lading_signal::Watcher, + metric_labels: Vec<(String, String)>, +} + +impl Neper { + /// Create a new [`Neper`] instance. + /// + /// # Errors + /// + /// Returns an error if configuration is invalid. + pub fn new( + general: General, + config: &Config, + shutdown: lading_signal::Watcher, + ) -> Result { + let metric_labels = MetricsBuilder::new("neper").with_id(general.id).build(); + Ok(Self { + config: config.clone(), + shutdown, + metric_labels, + }) + } + + /// Run the neper client to completion or until shutdown. + /// + /// # Errors + /// + /// Returns an error if spawning neper fails, it exits with a non-zero + /// status, or the output cannot be parsed. + pub async fn spin(self) -> Result<(), Error> { + // Give the server time to come up. + let delay = tokio::time::Duration::from_secs(self.config.startup_delay_seconds); + info!( + delay_secs = self.config.startup_delay_seconds, + "waiting for neper server to start" + ); + tokio::time::sleep(delay).await; + + let binary = PathBuf::from(NEPER_BIN_DIR).join(self.config.workload.binary_name()); + + let mut cmd = Command::new(&binary); + cmd.arg("-c") + .arg("-H") + .arg(&self.config.host) + .arg("-l") + .arg(self.config.duration_seconds.to_string()); + + if let Some(port) = self.config.control_port { + cmd.arg("-C").arg(port.to_string()); + } + if let Some(port) = self.config.data_port { + cmd.arg("-P").arg(port.to_string()); + } + for arg in &self.config.extra_args { + cmd.arg(arg); + } + + cmd.stdout(Stdio::piped()); + cmd.stderr(Stdio::piped()); + cmd.kill_on_drop(true); + + info!(?binary, "spawning neper client"); + let mut child = cmd.spawn()?; + + // Clone so we can wait for shutdown after the child finishes. + let shutdown_post = self.shutdown.clone(); + let shutdown_wait = self.shutdown.recv(); + tokio::pin!(shutdown_wait); + + tokio::select! { + result = child.wait() => { + let status = result?; + if !status.success() { + let stderr = match child.stderr.take() { + Some(mut se) => { + use tokio::io::AsyncReadExt; + let mut buf = String::new(); + let _ = se.read_to_string(&mut buf).await; + buf + } + None => String::new(), + }; + return Err(Error::NeperFailed { status, stderr }); + } + + let stdout = match child.stdout.take() { + Some(mut so) => { + use tokio::io::AsyncReadExt; + let mut buf = String::new(); + let _ = so.read_to_string(&mut buf).await; + buf + } + None => String::new(), + }; + + match parse_throughput(&stdout, self.config.workload) { + Ok(value) => { + info!(throughput = value, "neper run complete"); + gauge!("neper_throughput", &self.metric_labels).set(value); + } + Err(e) => { + warn!("could not parse neper throughput: {e}"); + } + } + + // Wait for shutdown signal before returning so lading keeps + // running (e.g. to let the observer collect the metric). + shutdown_post.recv().await; + } + () = &mut shutdown_wait => { + info!("shutdown signal received, killing neper client"); + let _ = child.kill().await; + } + } + + Ok(()) + } +} + +/// Parse the throughput value from neper's stdout output. +/// +/// Neper prints key=value pairs, one per line. We look for the line matching +/// the workload's throughput key. +fn parse_throughput(output: &str, workload: Workload) -> Result { + let key = workload.throughput_key(); + for line in output.lines() { + let line = line.trim(); + if let Some((k, v)) = line.split_once('=') + && k.trim() == key + { + return v.trim().parse::().map_err(|e| { + Error::OutputParse(format!("could not parse '{v}' as f64: {e}")) + }); + } + } + Err(Error::OutputParse(format!( + "key '{key}' not found in neper output" + ))) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parse_throughput_tcp_rr() { + let output = "num_flows=1\nnum_threads=1\nthroughput=12345.67\n"; + let value = parse_throughput(output, Workload::TcpRr).unwrap(); + assert!((value - 12345.67).abs() < f64::EPSILON); + } + + #[test] + fn parse_throughput_tcp_stream() { + let output = "throughput=98765.43\nlatency=0.5\n"; + let value = parse_throughput(output, Workload::TcpStream).unwrap(); + assert!((value - 98765.43).abs() < f64::EPSILON); + } + + #[test] + fn parse_throughput_missing_key() { + let output = "latency=0.5\n"; + assert!(parse_throughput(output, Workload::TcpRr).is_err()); + } + + #[test] + fn workload_binary_names() { + assert_eq!(Workload::TcpRr.binary_name(), "tcp_rr"); + assert_eq!(Workload::TcpCrr.binary_name(), "tcp_crr"); + assert_eq!(Workload::TcpStream.binary_name(), "tcp_stream"); + } +} From a1e7b28e5785c50907d683a9325f4e84e6a9e708 Mon Sep 17 00:00:00 2001 From: Usama Saqib Date: Tue, 3 Mar 2026 13:10:36 +0100 Subject: [PATCH 2/7] remove rlimit --- lading/Cargo.toml | 2 +- lading/src/blackhole/neper.rs | 10 ++++++++++ lading/src/generator/neper.rs | 10 ++++++++++ 3 files changed, 21 insertions(+), 1 deletion(-) diff --git a/lading/Cargo.toml b/lading/Cargo.toml index 33bbd172d..6ca72c25e 100644 --- a/lading/Cargo.toml +++ b/lading/Cargo.toml @@ -54,7 +54,7 @@ k8s-openapi = { version = "0.26.0", default-features = false, features = [ metrics = { workspace = true } metrics-exporter-prometheus = { workspace = true } metrics-util = { workspace = true } -nix = { version = "0.30", features = ["fs", "signal"] } +nix = { version = "0.30", features = ["fs", "resource", "signal"] } num_cpus = { version = "1.17" } num-traits = { version = "0.2", default-features = false } once_cell = { workspace = true } diff --git a/lading/src/blackhole/neper.rs b/lading/src/blackhole/neper.rs index 9b1d7da4e..9c66ffb18 100644 --- a/lading/src/blackhole/neper.rs +++ b/lading/src/blackhole/neper.rs @@ -6,6 +6,7 @@ use std::{io, path::PathBuf, process::Stdio}; +use nix::sys::resource::{Resource, getrlimit, setrlimit}; use serde::{Deserialize, Serialize}; use tokio::process::Command; use tracing::{info, warn}; @@ -37,6 +38,9 @@ pub enum Error { /// IO error #[error(transparent)] Io(#[from] io::Error), + /// Failed to set resource limits. + #[error("failed to set rlimit: {0}")] + Rlimit(#[from] nix::errno::Errno), /// Neper process exited with a non-zero status. #[error("neper server exited with {status}: {stderr}")] NeperFailed { @@ -85,6 +89,12 @@ impl Neper { pub async fn run(self) -> Result<(), Error> { let _ = &self.metric_labels; // reserved for future metrics + // Raise the open file descriptor limit to the hard limit. Neper opens + // many sockets and can easily exceed the default soft limit. + let (_, hard) = getrlimit(Resource::RLIMIT_NOFILE)?; + setrlimit(Resource::RLIMIT_NOFILE, hard, hard)?; + info!(nofile_limit = hard, "raised RLIMIT_NOFILE to hard limit"); + let binary = PathBuf::from(NEPER_BIN_DIR).join(self.config.workload.binary_name()); let mut cmd = Command::new(&binary); diff --git a/lading/src/generator/neper.rs b/lading/src/generator/neper.rs index 7b2b9f8c0..85aef14fe 100644 --- a/lading/src/generator/neper.rs +++ b/lading/src/generator/neper.rs @@ -12,6 +12,7 @@ use std::{io, path::PathBuf, process::Stdio}; use metrics::gauge; +use nix::sys::resource::{Resource, getrlimit, setrlimit}; use serde::{Deserialize, Serialize}; use tokio::process::Command; use tracing::{info, warn}; @@ -85,6 +86,9 @@ pub enum Error { /// IO error #[error(transparent)] Io(#[from] io::Error), + /// Failed to set resource limits. + #[error("failed to set rlimit: {0}")] + Rlimit(#[from] nix::errno::Errno), /// Neper process exited with a non-zero status. #[error("neper exited with {status}: {stderr}")] NeperFailed { @@ -135,6 +139,12 @@ impl Neper { /// Returns an error if spawning neper fails, it exits with a non-zero /// status, or the output cannot be parsed. pub async fn spin(self) -> Result<(), Error> { + // Raise the open file descriptor limit to the hard limit. Neper opens + // many sockets and can easily exceed the default soft limit. + let (_, hard) = getrlimit(Resource::RLIMIT_NOFILE)?; + setrlimit(Resource::RLIMIT_NOFILE, hard, hard)?; + info!(nofile_limit = hard, "raised RLIMIT_NOFILE to hard limit"); + // Give the server time to come up. let delay = tokio::time::Duration::from_secs(self.config.startup_delay_seconds); info!( From c88db00c5d6b7af4a50ad2ea515616e722e0f6cd Mon Sep 17 00:00:00 2001 From: Usama Saqib Date: Thu, 26 Mar 2026 16:29:26 +0100 Subject: [PATCH 3/7] fix exit race. emit metrics from blackhole not generator --- lading/src/blackhole/neper.rs | 162 +++++++++++++++++++++++++++------- lading/src/generator/neper.rs | 98 +++----------------- 2 files changed, 145 insertions(+), 115 deletions(-) diff --git a/lading/src/blackhole/neper.rs b/lading/src/blackhole/neper.rs index 9c66ffb18..def2baa62 100644 --- a/lading/src/blackhole/neper.rs +++ b/lading/src/blackhole/neper.rs @@ -1,11 +1,17 @@ //! The neper network performance blackhole. //! //! This blackhole spawns a [neper](https://github.com/google/neper) server -//! process that listens for incoming connections from a neper client. The -//! server is kept alive until a shutdown signal is received. +//! process that listens for incoming connections from a neper client. When +//! the server exits (after the client disconnects), its stdout is parsed for +//! the throughput value and emitted as a gauge metric. +//! +//! ## Metrics +//! +//! `neper_throughput`: Throughput value reported by the neper server use std::{io, path::PathBuf, process::Stdio}; +use metrics::gauge; use nix::sys::resource::{Resource, getrlimit, setrlimit}; use serde::{Deserialize, Serialize}; use tokio::process::Command; @@ -49,6 +55,10 @@ pub enum Error { /// Captured stderr stderr: String, }, + /// Neper server was still running when shutdown arrived and the grace + /// period expired. + #[error("neper server did not exit within the grace period after shutdown")] + ShutdownTimeout, } #[derive(Debug)] @@ -80,15 +90,19 @@ impl Neper { } } - /// Run the neper server until shutdown. + /// Run the neper server until the child exits. + /// + /// The blackhole always waits for the neper server to finish so that its + /// stdout can be parsed for the throughput metric. If a shutdown signal + /// arrives while the server is still running, a short grace period is + /// given; if the server does not exit in time it is killed and an error + /// is returned. /// /// # Errors /// - /// Returns an error if spawning neper fails or it exits with a non-zero - /// status before shutdown. + /// Returns an error if spawning neper fails, it exits with a non-zero + /// status, or it is still running when the shutdown grace period expires. pub async fn run(self) -> Result<(), Error> { - let _ = &self.metric_labels; // reserved for future metrics - // Raise the open file descriptor limit to the hard limit. Neper opens // many sockets and can easily exceed the default soft limit. let (_, hard) = getrlimit(Resource::RLIMIT_NOFILE)?; @@ -109,41 +123,127 @@ impl Neper { cmd.arg(arg); } - cmd.stdout(Stdio::null()); + cmd.stdout(Stdio::piped()); cmd.stderr(Stdio::piped()); cmd.kill_on_drop(true); info!(?binary, "spawning neper server"); let mut child = cmd.spawn()?; - let shutdown_wait = self.shutdown.recv(); - tokio::pin!(shutdown_wait); - - tokio::select! { - result = child.wait() => { - let status = result?; - if !status.success() { - let stderr = match child.stderr.take() { - Some(mut se) => { - use tokio::io::AsyncReadExt; - let mut buf = String::new(); - let _ = se.read_to_string(&mut buf).await; - buf + // Always wait for the child. If shutdown arrives first, give the + // server a short grace period to finish on its own before killing it. + let mut shutdown_wait = std::pin::pin!(self.shutdown.recv()); + let exited_cleanly = loop { + tokio::select! { + result = child.wait() => { + let status = result?; + if !status.success() { + let stderr = read_child_stderr(&mut child).await; + warn!(%status, "neper server exited unexpectedly"); + return Err(Error::NeperFailed { status, stderr }); + } + break true; + } + () = &mut shutdown_wait => { + info!("shutdown signal received, waiting briefly for neper server to exit"); + match tokio::time::timeout( + tokio::time::Duration::from_secs(5), + child.wait(), + ).await { + Ok(Ok(status)) if status.success() => break true, + Ok(Ok(status)) => { + let stderr = read_child_stderr(&mut child).await; + warn!(%status, "neper server exited with error during grace period"); + return Err(Error::NeperFailed { status, stderr }); + } + Ok(Err(e)) => return Err(Error::Io(e)), + Err(_) => { + warn!("grace period expired, killing neper server"); + let _ = child.kill().await; + break false; } - None => String::new(), - }; - warn!(%status, "neper server exited unexpectedly"); - return Err(Error::NeperFailed { status, stderr }); + } } - // Server exited cleanly (e.g. after the client disconnected). - warn!("neper server exited on its own"); } - () = &mut shutdown_wait => { - info!("shutdown signal received, killing neper server"); - let _ = child.kill().await; + }; + + if exited_cleanly { + let stdout = read_child_stdout(&mut child).await; + if let Some(value) = parse_throughput(&stdout, self.config.workload) { + info!(throughput = value, "neper server run complete"); + gauge!("neper_throughput", &self.metric_labels).set(value); + } else { + warn!("could not parse neper throughput from server output"); } + Ok(()) + } else { + Err(Error::ShutdownTimeout) + } + } +} + +async fn read_child_stdout(child: &mut tokio::process::Child) -> String { + match child.stdout.take() { + Some(mut so) => { + use tokio::io::AsyncReadExt; + let mut buf = String::new(); + let _ = so.read_to_string(&mut buf).await; + buf + } + None => String::new(), + } +} + +async fn read_child_stderr(child: &mut tokio::process::Child) -> String { + match child.stderr.take() { + Some(mut se) => { + use tokio::io::AsyncReadExt; + let mut buf = String::new(); + let _ = se.read_to_string(&mut buf).await; + buf } + None => String::new(), + } +} + +/// Parse the throughput value from neper's stdout output. +/// +/// Neper prints key=value pairs, one per line. We look for the line matching +/// the workload's throughput key. +fn parse_throughput(output: &str, workload: Workload) -> Option { + let key = workload.throughput_key(); + for line in output.lines() { + let line = line.trim(); + if let Some((k, v)) = line.split_once('=') + && k.trim() == key + { + return v.trim().parse::().ok(); + } + } + None +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parse_throughput_tcp_rr() { + let output = "num_flows=1\nnum_threads=1\nthroughput=12345.67\n"; + let value = parse_throughput(output, Workload::TcpRr).unwrap(); + assert!((value - 12345.67).abs() < f64::EPSILON); + } + + #[test] + fn parse_throughput_tcp_stream() { + let output = "throughput=98765.43\nlatency=0.5\n"; + let value = parse_throughput(output, Workload::TcpStream).unwrap(); + assert!((value - 98765.43).abs() < f64::EPSILON); + } - Ok(()) + #[test] + fn parse_throughput_missing_key() { + let output = "latency=0.5\n"; + assert!(parse_throughput(output, Workload::TcpRr).is_none()); } } diff --git a/lading/src/generator/neper.rs b/lading/src/generator/neper.rs index 85aef14fe..06a838824 100644 --- a/lading/src/generator/neper.rs +++ b/lading/src/generator/neper.rs @@ -2,23 +2,17 @@ //! //! This generator spawns a [neper](https://github.com/google/neper) client //! process to drive TCP workloads (request-response, connection-per-request, -//! streaming) against a neper server and reports the resulting throughput as a -//! metric. -//! -//! ## Metrics -//! -//! `neper_throughput`: Throughput value reported by neper at the end of the run +//! streaming) against a neper server. Throughput metrics are emitted by the +//! corresponding neper blackhole (server side). use std::{io, path::PathBuf, process::Stdio}; -use metrics::gauge; use nix::sys::resource::{Resource, getrlimit, setrlimit}; use serde::{Deserialize, Serialize}; use tokio::process::Command; -use tracing::{info, warn}; +use tracing::info; use super::General; -use crate::generator::common::MetricsBuilder; /// Directory where neper binaries are installed. const NEPER_BIN_DIR: &str = "/usr/local/bin"; @@ -51,7 +45,8 @@ impl Workload { } /// Return the key in neper's output that carries the throughput value. - fn throughput_key(self) -> &'static str { + #[must_use] + pub fn throughput_key(self) -> &'static str { match self { Workload::TcpRr | Workload::TcpCrr | Workload::TcpStream => "throughput", } @@ -97,9 +92,6 @@ pub enum Error { /// Captured stderr stderr: String, }, - /// Could not parse throughput from neper output. - #[error("failed to parse neper output: {0}")] - OutputParse(String), } #[derive(Debug)] @@ -110,7 +102,6 @@ pub enum Error { pub struct Neper { config: Config, shutdown: lading_signal::Watcher, - metric_labels: Vec<(String, String)>, } impl Neper { @@ -120,15 +111,13 @@ impl Neper { /// /// Returns an error if configuration is invalid. pub fn new( - general: General, + _general: General, config: &Config, shutdown: lading_signal::Watcher, ) -> Result { - let metric_labels = MetricsBuilder::new("neper").with_id(general.id).build(); Ok(Self { config: config.clone(), shutdown, - metric_labels, }) } @@ -136,8 +125,8 @@ impl Neper { /// /// # Errors /// - /// Returns an error if spawning neper fails, it exits with a non-zero - /// status, or the output cannot be parsed. + /// Returns an error if spawning neper fails or it exits with a non-zero + /// status. pub async fn spin(self) -> Result<(), Error> { // Raise the open file descriptor limit to the hard limit. Neper opens // many sockets and can easily exceed the default soft limit. @@ -172,19 +161,20 @@ impl Neper { cmd.arg(arg); } - cmd.stdout(Stdio::piped()); + cmd.stdout(Stdio::null()); cmd.stderr(Stdio::piped()); cmd.kill_on_drop(true); info!(?binary, "spawning neper client"); let mut child = cmd.spawn()?; - // Clone so we can wait for shutdown after the child finishes. let shutdown_post = self.shutdown.clone(); let shutdown_wait = self.shutdown.recv(); tokio::pin!(shutdown_wait); tokio::select! { + biased; + result = child.wait() => { let status = result?; if !status.success() { @@ -199,29 +189,10 @@ impl Neper { }; return Err(Error::NeperFailed { status, stderr }); } + info!("neper client finished"); - let stdout = match child.stdout.take() { - Some(mut so) => { - use tokio::io::AsyncReadExt; - let mut buf = String::new(); - let _ = so.read_to_string(&mut buf).await; - buf - } - None => String::new(), - }; - - match parse_throughput(&stdout, self.config.workload) { - Ok(value) => { - info!(throughput = value, "neper run complete"); - gauge!("neper_throughput", &self.metric_labels).set(value); - } - Err(e) => { - warn!("could not parse neper throughput: {e}"); - } - } - - // Wait for shutdown signal before returning so lading keeps - // running (e.g. to let the observer collect the metric). + // Wait for shutdown so lading keeps running while the + // blackhole collects and emits the throughput metric. shutdown_post.recv().await; } () = &mut shutdown_wait => { @@ -234,51 +205,10 @@ impl Neper { } } -/// Parse the throughput value from neper's stdout output. -/// -/// Neper prints key=value pairs, one per line. We look for the line matching -/// the workload's throughput key. -fn parse_throughput(output: &str, workload: Workload) -> Result { - let key = workload.throughput_key(); - for line in output.lines() { - let line = line.trim(); - if let Some((k, v)) = line.split_once('=') - && k.trim() == key - { - return v.trim().parse::().map_err(|e| { - Error::OutputParse(format!("could not parse '{v}' as f64: {e}")) - }); - } - } - Err(Error::OutputParse(format!( - "key '{key}' not found in neper output" - ))) -} - #[cfg(test)] mod tests { use super::*; - #[test] - fn parse_throughput_tcp_rr() { - let output = "num_flows=1\nnum_threads=1\nthroughput=12345.67\n"; - let value = parse_throughput(output, Workload::TcpRr).unwrap(); - assert!((value - 12345.67).abs() < f64::EPSILON); - } - - #[test] - fn parse_throughput_tcp_stream() { - let output = "throughput=98765.43\nlatency=0.5\n"; - let value = parse_throughput(output, Workload::TcpStream).unwrap(); - assert!((value - 98765.43).abs() < f64::EPSILON); - } - - #[test] - fn parse_throughput_missing_key() { - let output = "latency=0.5\n"; - assert!(parse_throughput(output, Workload::TcpRr).is_err()); - } - #[test] fn workload_binary_names() { assert_eq!(Workload::TcpRr.binary_name(), "tcp_rr"); From 86ee5ddd9170b425e4fcdb231575ccac1956f227 Mon Sep 17 00:00:00 2001 From: Usama Saqib Date: Thu, 26 Mar 2026 16:42:10 +0100 Subject: [PATCH 4/7] fix things --- lading/src/blackhole.rs | 6 ++--- lading/src/blackhole/neper.rs | 50 +++++++++++++++++------------------ lading/src/generator.rs | 6 ++--- 3 files changed, 28 insertions(+), 34 deletions(-) diff --git a/lading/src/blackhole.rs b/lading/src/blackhole.rs index dd2dc993f..605768511 100644 --- a/lading/src/blackhole.rs +++ b/lading/src/blackhole.rs @@ -11,6 +11,7 @@ mod common; pub mod datadog; pub mod datadog_stateful_logs; pub mod http; +pub mod neper; pub mod otlp; pub mod splunk_hec; pub mod sqs; @@ -18,7 +19,6 @@ pub mod tcp; pub mod udp; pub mod unix_datagram; pub mod unix_stream; -pub mod neper; #[derive(thiserror::Error, Debug)] /// Errors produced by [`Server`]. @@ -177,9 +177,7 @@ impl Server { Inner::Otlp(conf) => { Self::Otlp(otlp::Otlp::new(&config.general, &conf, &shutdown).map_err(Error::Otlp)?) } - Inner::Neper(conf) => { - Self::Neper(neper::Neper::new(config.general, &conf, shutdown)) - } + Inner::Neper(conf) => Self::Neper(neper::Neper::new(config.general, &conf, shutdown)), }; Ok(server) } diff --git a/lading/src/blackhole/neper.rs b/lading/src/blackhole/neper.rs index def2baa62..d560859cd 100644 --- a/lading/src/blackhole/neper.rs +++ b/lading/src/blackhole/neper.rs @@ -133,35 +133,33 @@ impl Neper { // Always wait for the child. If shutdown arrives first, give the // server a short grace period to finish on its own before killing it. let mut shutdown_wait = std::pin::pin!(self.shutdown.recv()); - let exited_cleanly = loop { - tokio::select! { - result = child.wait() => { - let status = result?; - if !status.success() { + let exited_cleanly = tokio::select! { + result = child.wait() => { + let status = result?; + if !status.success() { + let stderr = read_child_stderr(&mut child).await; + warn!(%status, "neper server exited unexpectedly"); + return Err(Error::NeperFailed { status, stderr }); + } + true + } + () = &mut shutdown_wait => { + info!("shutdown signal received, waiting briefly for neper server to exit"); + match tokio::time::timeout( + tokio::time::Duration::from_secs(5), + child.wait(), + ).await { + Ok(Ok(status)) if status.success() => true, + Ok(Ok(status)) => { let stderr = read_child_stderr(&mut child).await; - warn!(%status, "neper server exited unexpectedly"); + warn!(%status, "neper server exited with error during grace period"); return Err(Error::NeperFailed { status, stderr }); } - break true; - } - () = &mut shutdown_wait => { - info!("shutdown signal received, waiting briefly for neper server to exit"); - match tokio::time::timeout( - tokio::time::Duration::from_secs(5), - child.wait(), - ).await { - Ok(Ok(status)) if status.success() => break true, - Ok(Ok(status)) => { - let stderr = read_child_stderr(&mut child).await; - warn!(%status, "neper server exited with error during grace period"); - return Err(Error::NeperFailed { status, stderr }); - } - Ok(Err(e)) => return Err(Error::Io(e)), - Err(_) => { - warn!("grace period expired, killing neper server"); - let _ = child.kill().await; - break false; - } + Ok(Err(e)) => return Err(Error::Io(e)), + Err(_) => { + warn!("grace period expired, killing neper server"); + let _ = child.kill().await; + false } } } diff --git a/lading/src/generator.rs b/lading/src/generator.rs index 4eadce790..62fdb81bc 100644 --- a/lading/src/generator.rs +++ b/lading/src/generator.rs @@ -22,6 +22,7 @@ pub mod file_tree; pub mod grpc; pub mod http; pub mod kubernetes; +pub mod neper; pub mod passthru_file; pub mod process_tree; pub mod procfs; @@ -31,7 +32,6 @@ pub mod trace_agent; pub mod udp; pub mod unix_datagram; pub mod unix_stream; -pub mod neper; #[derive(thiserror::Error, Debug)] /// Errors produced by [`Server`]. @@ -253,9 +253,7 @@ impl Server { &conf, shutdown, )?), - Inner::Neper(conf) => { - Self::Neper(neper::Neper::new(config.general, &conf, shutdown)?) - } + Inner::Neper(conf) => Self::Neper(neper::Neper::new(config.general, &conf, shutdown)?), }; Ok(srv) } From 684e090a0e4efd04d7320a39b06834aeb9e2bd9e Mon Sep 17 00:00:00 2001 From: Usama Saqib Date: Thu, 26 Mar 2026 17:03:23 +0100 Subject: [PATCH 5/7] fix clippy error --- lading/src/bin/payloadtool.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lading/src/bin/payloadtool.rs b/lading/src/bin/payloadtool.rs index 1a9635bb6..1b1f92f42 100644 --- a/lading/src/bin/payloadtool.rs +++ b/lading/src/bin/payloadtool.rs @@ -494,7 +494,7 @@ fn check_generator(config: &generator::Config, args: &Args) -> Result { - if compute_fingerprint { + if args.fingerprint { warn!("Neper not supported for fingerprinting"); return Ok(None); } From 9b7f0f2e3620d2c30222adbfcf612d559b459fb0 Mon Sep 17 00:00:00 2001 From: Usama Saqib Date: Sat, 28 Mar 2026 10:43:50 +0100 Subject: [PATCH 6/7] show observer config --- lading/src/bin/lading.rs | 7 ++++++- lading/src/observer.rs | 3 +++ lading/src/observer/linux/procfs.rs | 4 +++- 3 files changed, 12 insertions(+), 2 deletions(-) diff --git a/lading/src/bin/lading.rs b/lading/src/bin/lading.rs index 6161fac55..80937f5ec 100644 --- a/lading/src/bin/lading.rs +++ b/lading/src/bin/lading.rs @@ -597,6 +597,8 @@ async fn inner_main( let mut tsrv_joinset = task::JoinSet::new(); let mut osrv_joinset = task::JoinSet::new(); + + let target_present = config.target.is_some(); // // OBSERVER // @@ -605,6 +607,7 @@ async fn inner_main( let obs_rcv = tgt_snd.subscribe(); let observer_server = observer::Server::new(config.observer, shutdown_watcher.clone())?; let sample_period = Duration::from_millis(config.sample_period_milliseconds); + info!("starting observer {:#?}", sample_period); osrv_joinset.spawn(observer_server.run(obs_rcv, sample_period)); // @@ -625,7 +628,9 @@ async fn inner_main( async move { info!("waiting for target startup"); target_running_watcher.recv().await; - info!("target is running, now sleeping for warmup"); + if target_present { + info!("target is running, now sleeping for warmup"); + } sleep(warmup_duration).await; experiment_started_broadcast.signal(); info!("warmup completed, collecting samples"); diff --git a/lading/src/observer.rs b/lading/src/observer.rs index 54d5663e6..a54b2dc4d 100644 --- a/lading/src/observer.rs +++ b/lading/src/observer.rs @@ -12,6 +12,7 @@ use std::io; use crate::target::TargetPidReceiver; use serde::Deserialize; +use tracing::info; #[cfg(target_os = "linux")] mod linux; @@ -111,6 +112,8 @@ impl Server { ) -> Result<(), Error> { use crate::observer::linux::Sampler; + info!("observer running. enable_smaps {} enable_smaps_rollup {}", self.config.enable_smaps, self.config.enable_smaps_rollup); + let target_pid = pid_snd .recv() .await diff --git a/lading/src/observer/linux/procfs.rs b/lading/src/observer/linux/procfs.rs index 73098703e..6453bcfb2 100644 --- a/lading/src/observer/linux/procfs.rs +++ b/lading/src/observer/linux/procfs.rs @@ -12,7 +12,7 @@ use metrics::{counter, gauge}; use nix::errno::Errno; use procfs::process::Process; use rustc_hash::FxHashMap; -use tracing::{error, warn}; +use tracing::{error, warn, info}; use crate::observer::linux::utils::process_descendents::ProcessDescendantsIterator; @@ -235,6 +235,8 @@ impl Sampler { let uptime = uptime::poll().await?; + info!("include_smaps {include_smaps}"); + info!("enable_smaps_rollup: {0}", self.enable_smaps_rollup); // `/proc/{pid}/stat`, most especially per-process CPU data. if let Err(e) = pinfo.stat_sampler.poll(pid, uptime, &labels).await { // We don't want to bail out entirely if we can't read stats From 68846bdfed1a731b37312734bd1bab52211c597e Mon Sep 17 00:00:00 2001 From: Usama Saqib Date: Mon, 30 Mar 2026 11:07:45 +0200 Subject: [PATCH 7/7] fix observer --- lading/src/config.rs | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/lading/src/config.rs b/lading/src/config.rs index 872f469bc..e1ff38d3b 100644 --- a/lading/src/config.rs +++ b/lading/src/config.rs @@ -44,6 +44,9 @@ pub enum Error { /// Error when inspector is defined in multiple config files #[error("Inspector cannot be defined in multiple config files")] ConflictingInspector, + /// Error when observer is defined in multiple config files + #[error("Observer cannot be defined in multiple config files")] + ConflictingObserver, /// Error getting metadata for config path #[error("Failed to get metadata for config path {path:?}: {source}")] Metadata { @@ -131,8 +134,7 @@ pub struct PartialConfig { #[serde(with = "serde_yaml::with::singleton_map_recursive")] pub generator: Vec, /// The observer that watches the target. - #[serde(default)] - pub observer: observer::Config, + pub observer: Option, /// The period on which target observations are made. pub sample_period_milliseconds: Option, /// The blackhole to supply for the target. @@ -270,7 +272,7 @@ impl Config { Ok(Self { telemetry: partial.telemetry, generator: partial.generator, - observer: partial.observer, + observer: partial.observer.unwrap_or_default(), sample_period_milliseconds: partial .sample_period_milliseconds .unwrap_or_else(default_sample_period), @@ -325,6 +327,13 @@ impl Config { base.inspector = overlay.inspector; } + if base.observer.is_some() && overlay.observer.is_some() { + return Err(Error::ConflictingObserver); + } + if overlay.observer.is_some() { + base.observer = overlay.observer; + } + // Merge generators base.generator.extend(overlay.generator); check_duplicate_generator_ids(&base.generator)?; @@ -532,7 +541,7 @@ mod tests { PartialConfig { telemetry: None, generator: generators, - observer: observer::Config::default(), + observer: None, sample_period_milliseconds: None, blackhole: vec![], target_metrics: None, @@ -545,7 +554,7 @@ mod tests { PartialConfig { telemetry: None, generator: vec![], - observer: observer::Config::default(), + observer: None, sample_period_milliseconds: None, blackhole: blackholes, target_metrics: None, @@ -558,7 +567,7 @@ mod tests { PartialConfig { telemetry: None, generator: vec![], - observer: observer::Config::default(), + observer: None, sample_period_milliseconds: None, blackhole: vec![], target_metrics: Some(metrics),