Skip to content

Turbopack: limit compaction merging by size instead of count #78669

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 57 additions & 4 deletions turbopack/crates/turbo-persistence/src/compaction/selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ type Range = (u64, u64);
pub trait Compactable {
/// Returns the range of the compactable.
fn range(&self) -> Range;

/// Returns the size of the compactable.
fn size(&self) -> usize;
}

fn is_overlapping(a: &Range, b: &Range) -> bool {
Expand Down Expand Up @@ -55,11 +58,14 @@ pub fn total_coverage<T: Compactable>(compactables: &[T], full_range: Range) ->

/// Configuration for the compaction algorithm.
pub struct CompactConfig {
/// The minimum number of files to merge at once.
pub min_merge: usize,

/// The maximum number of files to merge at once.
pub max_merge: usize,

/// The minimum number of files to merge at once.
pub min_merge: usize,
/// The maximum size of all files to merge at once.
pub max_merge_size: usize,
}

/// For a list of compactables, computes merge and move jobs that are expected to perform best.
Expand Down Expand Up @@ -102,6 +108,7 @@ fn get_compaction_jobs_internal<T: Compactable>(
let start_range = compactables[start].range();
let mut range = start_range;

let mut merge_job_size = compactables[start].size();
let mut merge_job = Vec::new();
merge_job.push(start);
let mut merge_job_input_spread = spread(&start_range) as f32;
Expand All @@ -116,8 +123,13 @@ fn get_compaction_jobs_internal<T: Compactable>(
if is_overlapping(&range, &range_for_i) {
let mut extended_range = range;
if !extend_range(&mut extended_range, &range_for_i) {
let size = compactables[i].size();
if merge_job_size + size > config.max_merge_size {
break 'outer;
}
used_compactables[i] = true;
merge_job.push(i);
merge_job_size += compactables[i].size();
merge_job_input_spread += spread(&range_for_i) as f32;
} else {
let s = spread(&range);
Expand Down Expand Up @@ -216,22 +228,32 @@ mod tests {

struct TestCompactable {
range: Range,
size: usize,
}

impl Compactable for TestCompactable {
fn range(&self) -> Range {
self.range
}

fn size(&self) -> usize {
self.size
}
}

fn compact<const N: usize>(ranges: [(u64, u64); N], max_merge: usize) -> CompactionJobs {
fn compact<const N: usize>(
ranges: [(u64, u64); N],
max_merge: usize,
max_merge_size: usize,
) -> CompactionJobs {
let compactables = ranges
.iter()
.map(|&range| TestCompactable { range })
.map(|&range| TestCompactable { range, size: 100 })
.collect::<Vec<_>>();
let config = CompactConfig {
max_merge,
min_merge: 2,
max_merge_size,
};
get_compaction_jobs(&compactables, &config)
}
Expand All @@ -255,6 +277,32 @@ mod tests {
(30, 40),
],
3,
usize::MAX,
);
assert_eq!(merge_jobs, vec![vec![0, 1, 2], vec![4, 5, 6]]);
assert_eq!(move_jobs, vec![3, 8]);
}

#[test]
fn test_compaction_jobs_by_size() {
let CompactionJobs {
merge_jobs,
move_jobs,
..
} = compact(
[
(0, 10),
(10, 30),
(9, 13),
(0, 30),
(40, 44),
(41, 42),
(41, 47),
(90, 100),
(30, 40),
],
usize::MAX,
300,
);
assert_eq!(merge_jobs, vec![vec![0, 1, 2], vec![4, 5, 6]]);
assert_eq!(move_jobs, vec![3, 8]);
Expand Down Expand Up @@ -293,6 +341,7 @@ mod tests {
let config = CompactConfig {
max_merge: 4,
min_merge: 2,
max_merge_size: usize::MAX,
};
let jobs = get_compaction_jobs(&containers, &config);
if !jobs.is_empty() {
Expand Down Expand Up @@ -337,6 +386,10 @@ mod tests {
fn range(&self) -> Range {
(self.keys[0], *self.keys.last().unwrap())
}

fn size(&self) -> usize {
self.keys.len()
}
}

impl Debug for Container {
Expand Down
27 changes: 23 additions & 4 deletions turbopack/crates/turbo-persistence/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -534,15 +534,20 @@ impl TurboPersistence {
/// Runs a full compaction on the database. This will rewrite all SST files, removing all
/// duplicate keys and separating all key ranges into unique files.
pub fn full_compact(&self) -> Result<()> {
self.compact(0.0, usize::MAX)?;
self.compact(0.0, usize::MAX, usize::MAX)?;
Ok(())
}

/// Runs a (partial) compaction. Compaction will only be performed if the coverage of the SST
/// files is above the given threshold. The coverage is the average number of SST files that
/// need to be read to find a key. It also limits the maximum number of SST files that are
/// merged at once, which is the main factor for the runtime of the compaction.
pub fn compact(&self, max_coverage: f32, max_merge_sequence: usize) -> Result<()> {
pub fn compact(
&self,
max_coverage: f32,
max_merge_sequence: usize,
max_merge_size: usize,
) -> Result<()> {
if self
.active_write_operation
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
Expand All @@ -568,6 +573,7 @@ impl TurboPersistence {
&mut indicies_to_delete,
max_coverage,
max_merge_sequence,
max_merge_size,
)?;
}

Expand All @@ -594,6 +600,7 @@ impl TurboPersistence {
indicies_to_delete: &mut Vec<usize>,
max_coverage: f32,
max_merge_sequence: usize,
max_merge_size: usize,
) -> Result<()> {
if static_sorted_files.is_empty() {
return Ok(());
Expand All @@ -602,18 +609,29 @@ impl TurboPersistence {
struct SstWithRange {
index: usize,
range: StaticSortedFileRange,
size: usize,
}

impl Compactable for SstWithRange {
fn range(&self) -> (u64, u64) {
(self.range.min_hash, self.range.max_hash)
}

fn size(&self) -> usize {
self.size
}
}

let ssts_with_ranges = static_sorted_files
.iter()
.enumerate()
.flat_map(|(index, sst)| sst.range().ok().map(|range| SstWithRange { index, range }))
.flat_map(|(index, sst)| {
sst.range().ok().map(|range| SstWithRange {
index,
range,
size: sst.size(),
})
})
.collect::<Vec<_>>();

let families = ssts_with_ranges
Expand Down Expand Up @@ -651,8 +669,9 @@ impl TurboPersistence {
} = get_compaction_jobs(
&ssts_with_ranges,
&CompactConfig {
max_merge: max_merge_sequence,
min_merge: 2,
max_merge: max_merge_sequence,
max_merge_size,
},
);

Expand Down
2 changes: 1 addition & 1 deletion turbopack/crates/turbo-persistence/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ fn persist_changes() -> Result<()> {
{
let db = TurboPersistence::open(path.to_path_buf())?;

db.compact(1.0, 3)?;
db.compact(1.0, 3, usize::MAX)?;

check(&db, 1, 13)?;
check(&db, 2, 22)?;
Expand Down
21 changes: 16 additions & 5 deletions turbopack/crates/turbo-tasks-backend/src/database/turbo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ use crate::database::{
};

const COMPACT_MAX_COVERAGE: f32 = 10.0;
const COMPACT_MAX_MERGE_SEQUENCE: usize = 8;
const COMPACT_MAX_MERGE_SEQUENCE: usize = 16;
const COMPACT_MAX_MERGE_SIZE: usize = 512 * 1024 * 1024; // 512 MiB

pub struct TurboKeyValueDatabase {
db: Arc<TurboPersistence>,
Expand All @@ -30,8 +31,13 @@ impl TurboKeyValueDatabase {
};
// start compaction in background if the database is not empty
if !db.is_empty() {
let handle =
spawn(move || db.compact(COMPACT_MAX_COVERAGE, COMPACT_MAX_MERGE_SEQUENCE));
let handle = spawn(move || {
db.compact(
COMPACT_MAX_COVERAGE,
COMPACT_MAX_MERGE_SEQUENCE,
COMPACT_MAX_MERGE_SIZE,
)
});
this.compact_join_handle.get_mut().replace(handle);
}
Ok(this)
Expand Down Expand Up @@ -131,8 +137,13 @@ impl<'a> BaseWriteBatch<'a> for TurboWriteBatch<'a> {
if !self.initial_write {
// Start a new compaction in the background
let db = self.db.clone();
let handle =
spawn(move || db.compact(COMPACT_MAX_COVERAGE, COMPACT_MAX_MERGE_SEQUENCE));
let handle = spawn(move || {
db.compact(
COMPACT_MAX_COVERAGE,
COMPACT_MAX_MERGE_SEQUENCE,
COMPACT_MAX_MERGE_SIZE,
)
});
self.compact_join_handle.lock().replace(handle);
}

Expand Down
Loading