diff --git a/Cargo.lock b/Cargo.lock index 09de89d23..065f9c76f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1411,6 +1411,22 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" +[[package]] +name = "humantime" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "135b12329e5e3ce057a9f972339ea52bc954fe1e9358ef27f95e89716fbc5424" + +[[package]] +name = "humantime-serde" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57a3db5ea5923d99402c94e9feb261dc5ee9b4efa158b0315f788cf549cc200c" +dependencies = [ + "humantime", + "serde", +] + [[package]] name = "hyper" version = "1.8.1" @@ -1860,6 +1876,7 @@ dependencies = [ "http", "http-body-util", "http-serde", + "humantime-serde", "hyper", "hyper-util", "k8s-openapi", diff --git a/lading/Cargo.toml b/lading/Cargo.toml index 1ce5d5a79..c390b9249 100644 --- a/lading/Cargo.toml +++ b/lading/Cargo.toml @@ -69,6 +69,7 @@ reqwest = { version = "0.12", default-features = false, features = [ rustc-hash = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } +humantime-serde = { version = "1.1" } serde_qs = { version = "0.15", default-features = false } serde_yaml = { workspace = true } sha2 = { workspace = true, features = ["std"] } @@ -118,5 +119,9 @@ doctest = false [[bin]] name = "lading" +[package.metadata.cargo-machete] +# False positive: used via `#[serde(with = "humantime_serde")]` field attributes. +ignored = ["humantime-serde"] + [lints] workspace = true diff --git a/lading/src/generator/file_gen/logrotate.rs b/lading/src/generator/file_gen/logrotate.rs index 11af7890c..9e77ed7e6 100644 --- a/lading/src/generator/file_gen/logrotate.rs +++ b/lading/src/generator/file_gen/logrotate.rs @@ -18,8 +18,11 @@ use std::{ num::NonZeroU32, path::{Path, PathBuf}, sync::Arc, + time::Duration, }; +use tokio::time::{Instant, MissedTickBehavior, interval_at}; + use byte_unit::Byte; use futures::future::join_all; use metrics::counter; @@ -117,6 +120,10 @@ pub enum Error { ThrottleConversion(#[from] ThrottleConversionError), } +fn default_flush_every() -> Duration { + Duration::MAX +} + #[derive(Debug, Deserialize, Serialize, PartialEq, Clone)] #[serde(deny_unknown_fields)] /// Configuration of [`FileGen`] @@ -150,6 +157,10 @@ pub struct Config { /// Throughput profile controlling emission rate (bytes or blocks). #[serde(default)] pub throttle: Option, + /// Force flush at a regular interval. Defaults to `Duration::MAX`. + /// Accepts human-readable durations (e.g., "1s", "500ms", "2s"). + #[serde(default = "default_flush_every", with = "humantime_serde")] + pub flush_every: Duration, } #[derive(Debug)] @@ -240,6 +251,7 @@ impl Server { throughput_throttle, shutdown.clone(), child_labels, + config.flush_every, ); handles.push(tokio::spawn(child.spin())); @@ -290,6 +302,7 @@ struct Child { throttle: BlockThrottle, shutdown: lading_signal::Watcher, labels: Vec<(String, String)>, + flush_every: Duration, } impl Child { @@ -303,6 +316,7 @@ impl Child { throttle: BlockThrottle, shutdown: lading_signal::Watcher, labels: Vec<(String, String)>, + flush_every: Duration, ) -> Self { let mut names = Vec::with_capacity((total_rotations + 1).into()); names.push(PathBuf::from(basename)); @@ -325,6 +339,7 @@ impl Child { throttle, shutdown, labels, + flush_every, } } @@ -335,6 +350,8 @@ impl Child { .maximum_capacity_bytes(self.maximum_block_size); let mut total_bytes_written: u64 = 0; let maximum_bytes_per_log: u64 = u64::from(self.maximum_bytes_per_log.get()); + let mut flush_interval = interval_at(Instant::now() + self.flush_every, self.flush_every); + flush_interval.set_missed_tick_behavior(MissedTickBehavior::Delay); let total_names = self.names.len(); // SAFETY: By construction there is guaranteed to be at least one name. @@ -390,6 +407,9 @@ impl Child { } } } + _tick = flush_interval.tick() => { + fp.flush().await.map_err(|err| Error::IoFlush { err })?; + } () = &mut shutdown_wait => { fp.flush().await.map_err(|err| Error::IoFlush { err })?; drop(fp); @@ -401,7 +421,7 @@ impl Child { } } } - +/// Writes a block to the file, rotating if necessary. #[expect(clippy::too_many_arguments)] async fn write_bytes( blk: &block::Block, @@ -480,6 +500,7 @@ async fn write_bytes( })?, ); *total_bytes_written = 0; + return Ok(()); } Ok(()) diff --git a/lading/src/generator/file_gen/traditional.rs b/lading/src/generator/file_gen/traditional.rs index 4db2f5443..c067b4894 100644 --- a/lading/src/generator/file_gen/traditional.rs +++ b/lading/src/generator/file_gen/traditional.rs @@ -20,8 +20,11 @@ use std::{ Arc, atomic::{AtomicU32, Ordering}, }, + time::Duration, }; +use tokio::time::{Instant, MissedTickBehavior, interval_at}; + use byte_unit::Byte; use metrics::counter; use rand::{SeedableRng, prelude::StdRng}; @@ -78,6 +81,10 @@ fn default_rotation() -> bool { true } +fn default_flush_every() -> Duration { + Duration::MAX +} + #[derive(Debug, Deserialize, Serialize, PartialEq, Clone)] #[serde(deny_unknown_fields)] /// Configuration of [`FileGen`] @@ -121,6 +128,10 @@ pub struct Config { rotate: bool, /// The load throttle configuration pub throttle: Option, + /// Force flush at a regular interval. Defaults to `Duration::MAX`. + /// Accepts human-readable durations (e.g., "1s", "500ms", "2s"). + #[serde(default = "default_flush_every", with = "humantime_serde")] + pub flush_every: Duration, } #[derive(Debug)] @@ -200,6 +211,7 @@ impl Server { file_index: Arc::clone(&file_index), rotate: config.rotate, shutdown: shutdown.clone(), + flush_every: config.flush_every, }; handles.spawn(child.spin()); @@ -275,6 +287,7 @@ struct Child { rotate: bool, file_index: Arc, shutdown: lading_signal::Watcher, + flush_every: Duration, } impl Child { @@ -287,8 +300,6 @@ impl Child { let mut path = path_from_template(&self.path_template, file_index); let mut handle = self.block_cache.handle(); - // Setting write buffer capacity (per second) approximately equal to the throttle's maximum capacity - // (converted to bytes if necessary) to approximate flush every second. let buffer_capacity = self .throttle .maximum_capacity_bytes(self.maximum_block_size); @@ -312,6 +323,8 @@ impl Child { ); let shutdown_wait = self.shutdown.recv(); tokio::pin!(shutdown_wait); + let mut flush_interval = interval_at(Instant::now() + self.flush_every, self.flush_every); + flush_interval.set_missed_tick_behavior(MissedTickBehavior::Delay); loop { tokio::select! { result = self.throttle.wait_for_block(&self.block_cache, &handle) => { @@ -363,6 +376,9 @@ impl Child { } } } + _tick = flush_interval.tick() => { + fp.flush().await?; + } () = &mut shutdown_wait => { fp.flush().await?; info!("shutdown signal received");