Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions lading/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ reqwest = { version = "0.12", default-features = false, features = [
rustc-hash = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
humantime-serde = { version = "1.1" }
serde_qs = { version = "0.15", default-features = false }
serde_yaml = { workspace = true }
sha2 = { workspace = true, features = ["std"] }
Expand Down Expand Up @@ -118,5 +119,9 @@ doctest = false
[[bin]]
name = "lading"

[package.metadata.cargo-machete]
# False positive: used via `#[serde(with = "humantime_serde")]` field attributes.
ignored = ["humantime-serde"]

[lints]
workspace = true
23 changes: 22 additions & 1 deletion lading/src/generator/file_gen/logrotate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ use std::{
num::NonZeroU32,
path::{Path, PathBuf},
sync::Arc,
time::Duration,
};

use tokio::time::{Instant, MissedTickBehavior, interval_at};

use byte_unit::Byte;
use futures::future::join_all;
use metrics::counter;
Expand Down Expand Up @@ -117,6 +120,10 @@ pub enum Error {
ThrottleConversion(#[from] ThrottleConversionError),
}

fn default_flush_every() -> Duration {
Duration::MAX
}

#[derive(Debug, Deserialize, Serialize, PartialEq, Clone)]
#[serde(deny_unknown_fields)]
/// Configuration of [`FileGen`]
Expand Down Expand Up @@ -150,6 +157,10 @@ pub struct Config {
/// Throughput profile controlling emission rate (bytes or blocks).
#[serde(default)]
pub throttle: Option<ThrottleConfig>,
/// Force flush at a regular interval. Defaults to `Duration::MAX`.
/// Accepts human-readable durations (e.g., "1s", "500ms", "2s").
#[serde(default = "default_flush_every", with = "humantime_serde")]
pub flush_every: Duration,
}

#[derive(Debug)]
Expand Down Expand Up @@ -240,6 +251,7 @@ impl Server {
throughput_throttle,
shutdown.clone(),
child_labels,
config.flush_every,
);

handles.push(tokio::spawn(child.spin()));
Expand Down Expand Up @@ -290,6 +302,7 @@ struct Child {
throttle: BlockThrottle,
shutdown: lading_signal::Watcher,
labels: Vec<(String, String)>,
flush_every: Duration,
}

impl Child {
Expand All @@ -303,6 +316,7 @@ impl Child {
throttle: BlockThrottle,
shutdown: lading_signal::Watcher,
labels: Vec<(String, String)>,
flush_every: Duration,
) -> Self {
let mut names = Vec::with_capacity((total_rotations + 1).into());
names.push(PathBuf::from(basename));
Expand All @@ -325,6 +339,7 @@ impl Child {
throttle,
shutdown,
labels,
flush_every,
}
}

Expand All @@ -335,6 +350,8 @@ impl Child {
.maximum_capacity_bytes(self.maximum_block_size);
let mut total_bytes_written: u64 = 0;
let maximum_bytes_per_log: u64 = u64::from(self.maximum_bytes_per_log.get());
let mut flush_interval = interval_at(Instant::now() + self.flush_every, self.flush_every);
flush_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);

let total_names = self.names.len();
// SAFETY: By construction there is guaranteed to be at least one name.
Expand Down Expand Up @@ -390,6 +407,9 @@ impl Child {
}
}
}
_tick = flush_interval.tick() => {
fp.flush().await.map_err(|err| Error::IoFlush { err })?;
}
() = &mut shutdown_wait => {
fp.flush().await.map_err(|err| Error::IoFlush { err })?;
drop(fp);
Expand All @@ -401,7 +421,7 @@ impl Child {
}
}
}

/// Writes a block to the file, rotating if necessary.
#[expect(clippy::too_many_arguments)]
async fn write_bytes(
blk: &block::Block,
Expand Down Expand Up @@ -480,6 +500,7 @@ async fn write_bytes(
})?,
);
*total_bytes_written = 0;
return Ok(());
}

Ok(())
Expand Down
20 changes: 18 additions & 2 deletions lading/src/generator/file_gen/traditional.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@ use std::{
Arc,
atomic::{AtomicU32, Ordering},
},
time::Duration,
};

use tokio::time::{Instant, MissedTickBehavior, interval_at};

use byte_unit::Byte;
use metrics::counter;
use rand::{SeedableRng, prelude::StdRng};
Expand Down Expand Up @@ -78,6 +81,10 @@ fn default_rotation() -> bool {
true
}

fn default_flush_every() -> Duration {
Duration::MAX
}

#[derive(Debug, Deserialize, Serialize, PartialEq, Clone)]
#[serde(deny_unknown_fields)]
/// Configuration of [`FileGen`]
Expand Down Expand Up @@ -121,6 +128,10 @@ pub struct Config {
rotate: bool,
/// The load throttle configuration
pub throttle: Option<ThrottleConfig>,
/// Force flush at a regular interval. Defaults to `Duration::MAX`.
/// Accepts human-readable durations (e.g., "1s", "500ms", "2s").
#[serde(default = "default_flush_every", with = "humantime_serde")]
pub flush_every: Duration,
}

#[derive(Debug)]
Expand Down Expand Up @@ -200,6 +211,7 @@ impl Server {
file_index: Arc::clone(&file_index),
rotate: config.rotate,
shutdown: shutdown.clone(),
flush_every: config.flush_every,
};

handles.spawn(child.spin());
Expand Down Expand Up @@ -275,6 +287,7 @@ struct Child {
rotate: bool,
file_index: Arc<AtomicU32>,
shutdown: lading_signal::Watcher,
flush_every: Duration,
}

impl Child {
Expand All @@ -287,8 +300,6 @@ impl Child {
let mut path = path_from_template(&self.path_template, file_index);

let mut handle = self.block_cache.handle();
// Setting write buffer capacity (per second) approximately equal to the throttle's maximum capacity
// (converted to bytes if necessary) to approximate flush every second.
let buffer_capacity = self
.throttle
.maximum_capacity_bytes(self.maximum_block_size);
Expand All @@ -312,6 +323,8 @@ impl Child {
);
let shutdown_wait = self.shutdown.recv();
tokio::pin!(shutdown_wait);
let mut flush_interval = interval_at(Instant::now() + self.flush_every, self.flush_every);
flush_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
loop {
tokio::select! {
result = self.throttle.wait_for_block(&self.block_cache, &handle) => {
Expand Down Expand Up @@ -363,6 +376,9 @@ impl Child {
}
}
}
_tick = flush_interval.tick() => {
fp.flush().await?;
}
() = &mut shutdown_wait => {
fp.flush().await?;
info!("shutdown signal received");
Expand Down
Loading