diff --git a/turbopack/crates/turbo-persistence/src/compaction/selector.rs b/turbopack/crates/turbo-persistence/src/compaction/selector.rs index 2790b79b9771c..df0707db93fb2 100644 --- a/turbopack/crates/turbo-persistence/src/compaction/selector.rs +++ b/turbopack/crates/turbo-persistence/src/compaction/selector.rs @@ -19,6 +19,9 @@ type Range = (u64, u64); pub trait Compactable { /// Returns the range of the compactable. fn range(&self) -> Range; + + /// Returns the size of the compactable. + fn size(&self) -> usize; } fn is_overlapping(a: &Range, b: &Range) -> bool { @@ -55,11 +58,14 @@ pub fn total_coverage(compactables: &[T], full_range: Range) -> /// Configuration for the compaction algorithm. pub struct CompactConfig { + /// The minimum number of files to merge at once. + pub min_merge: usize, + /// The maximum number of files to merge at once. pub max_merge: usize, - /// The minimum number of files to merge at once. - pub min_merge: usize, + /// The maximum size of all files to merge at once. + pub max_merge_size: usize, } /// For a list of compactables, computes merge and move jobs that are expected to perform best. @@ -102,6 +108,7 @@ fn get_compaction_jobs_internal( let start_range = compactables[start].range(); let mut range = start_range; + let mut merge_job_size = compactables[start].size(); let mut merge_job = Vec::new(); merge_job.push(start); let mut merge_job_input_spread = spread(&start_range) as f32; @@ -116,8 +123,13 @@ fn get_compaction_jobs_internal( if is_overlapping(&range, &range_for_i) { let mut extended_range = range; if !extend_range(&mut extended_range, &range_for_i) { + let size = compactables[i].size(); + if merge_job_size + size > config.max_merge_size { + break 'outer; + } used_compactables[i] = true; merge_job.push(i); + merge_job_size += compactables[i].size(); merge_job_input_spread += spread(&range_for_i) as f32; } else { let s = spread(&range); @@ -216,22 +228,32 @@ mod tests { struct TestCompactable { range: Range, + size: usize, } impl Compactable for TestCompactable { fn range(&self) -> Range { self.range } + + fn size(&self) -> usize { + self.size + } } - fn compact(ranges: [(u64, u64); N], max_merge: usize) -> CompactionJobs { + fn compact( + ranges: [(u64, u64); N], + max_merge: usize, + max_merge_size: usize, + ) -> CompactionJobs { let compactables = ranges .iter() - .map(|&range| TestCompactable { range }) + .map(|&range| TestCompactable { range, size: 100 }) .collect::>(); let config = CompactConfig { max_merge, min_merge: 2, + max_merge_size, }; get_compaction_jobs(&compactables, &config) } @@ -255,6 +277,32 @@ mod tests { (30, 40), ], 3, + usize::MAX, + ); + assert_eq!(merge_jobs, vec![vec![0, 1, 2], vec![4, 5, 6]]); + assert_eq!(move_jobs, vec![3, 8]); + } + + #[test] + fn test_compaction_jobs_by_size() { + let CompactionJobs { + merge_jobs, + move_jobs, + .. + } = compact( + [ + (0, 10), + (10, 30), + (9, 13), + (0, 30), + (40, 44), + (41, 42), + (41, 47), + (90, 100), + (30, 40), + ], + usize::MAX, + 300, ); assert_eq!(merge_jobs, vec![vec![0, 1, 2], vec![4, 5, 6]]); assert_eq!(move_jobs, vec![3, 8]); @@ -293,6 +341,7 @@ mod tests { let config = CompactConfig { max_merge: 4, min_merge: 2, + max_merge_size: usize::MAX, }; let jobs = get_compaction_jobs(&containers, &config); if !jobs.is_empty() { @@ -337,6 +386,10 @@ mod tests { fn range(&self) -> Range { (self.keys[0], *self.keys.last().unwrap()) } + + fn size(&self) -> usize { + self.keys.len() + } } impl Debug for Container { diff --git a/turbopack/crates/turbo-persistence/src/db.rs b/turbopack/crates/turbo-persistence/src/db.rs index 75e28036ebbc9..5f9f72d72f514 100644 --- a/turbopack/crates/turbo-persistence/src/db.rs +++ b/turbopack/crates/turbo-persistence/src/db.rs @@ -534,7 +534,7 @@ impl TurboPersistence { /// Runs a full compaction on the database. This will rewrite all SST files, removing all /// duplicate keys and separating all key ranges into unique files. pub fn full_compact(&self) -> Result<()> { - self.compact(0.0, usize::MAX)?; + self.compact(0.0, usize::MAX, usize::MAX)?; Ok(()) } @@ -542,7 +542,12 @@ impl TurboPersistence { /// files is above the given threshold. The coverage is the average number of SST files that /// need to be read to find a key. It also limits the maximum number of SST files that are /// merged at once, which is the main factor for the runtime of the compaction. - pub fn compact(&self, max_coverage: f32, max_merge_sequence: usize) -> Result<()> { + pub fn compact( + &self, + max_coverage: f32, + max_merge_sequence: usize, + max_merge_size: usize, + ) -> Result<()> { if self .active_write_operation .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) @@ -568,6 +573,7 @@ impl TurboPersistence { &mut indicies_to_delete, max_coverage, max_merge_sequence, + max_merge_size, )?; } @@ -594,6 +600,7 @@ impl TurboPersistence { indicies_to_delete: &mut Vec, max_coverage: f32, max_merge_sequence: usize, + max_merge_size: usize, ) -> Result<()> { if static_sorted_files.is_empty() { return Ok(()); @@ -602,18 +609,29 @@ impl TurboPersistence { struct SstWithRange { index: usize, range: StaticSortedFileRange, + size: usize, } impl Compactable for SstWithRange { fn range(&self) -> (u64, u64) { (self.range.min_hash, self.range.max_hash) } + + fn size(&self) -> usize { + self.size + } } let ssts_with_ranges = static_sorted_files .iter() .enumerate() - .flat_map(|(index, sst)| sst.range().ok().map(|range| SstWithRange { index, range })) + .flat_map(|(index, sst)| { + sst.range().ok().map(|range| SstWithRange { + index, + range, + size: sst.size(), + }) + }) .collect::>(); let families = ssts_with_ranges @@ -651,8 +669,9 @@ impl TurboPersistence { } = get_compaction_jobs( &ssts_with_ranges, &CompactConfig { - max_merge: max_merge_sequence, min_merge: 2, + max_merge: max_merge_sequence, + max_merge_size, }, ); diff --git a/turbopack/crates/turbo-persistence/src/tests.rs b/turbopack/crates/turbo-persistence/src/tests.rs index 43e6668b767f2..f74adf6e95ffd 100644 --- a/turbopack/crates/turbo-persistence/src/tests.rs +++ b/turbopack/crates/turbo-persistence/src/tests.rs @@ -433,7 +433,7 @@ fn persist_changes() -> Result<()> { { let db = TurboPersistence::open(path.to_path_buf())?; - db.compact(1.0, 3)?; + db.compact(1.0, 3, usize::MAX)?; check(&db, 1, 13)?; check(&db, 2, 22)?; diff --git a/turbopack/crates/turbo-tasks-backend/src/database/turbo.rs b/turbopack/crates/turbo-tasks-backend/src/database/turbo.rs index 2c24b1919d4e7..5a90e3e4bf663 100644 --- a/turbopack/crates/turbo-tasks-backend/src/database/turbo.rs +++ b/turbopack/crates/turbo-tasks-backend/src/database/turbo.rs @@ -14,7 +14,8 @@ use crate::database::{ }; const COMPACT_MAX_COVERAGE: f32 = 10.0; -const COMPACT_MAX_MERGE_SEQUENCE: usize = 8; +const COMPACT_MAX_MERGE_SEQUENCE: usize = 16; +const COMPACT_MAX_MERGE_SIZE: usize = 512 * 1024 * 1024; // 512 MiB pub struct TurboKeyValueDatabase { db: Arc, @@ -30,8 +31,13 @@ impl TurboKeyValueDatabase { }; // start compaction in background if the database is not empty if !db.is_empty() { - let handle = - spawn(move || db.compact(COMPACT_MAX_COVERAGE, COMPACT_MAX_MERGE_SEQUENCE)); + let handle = spawn(move || { + db.compact( + COMPACT_MAX_COVERAGE, + COMPACT_MAX_MERGE_SEQUENCE, + COMPACT_MAX_MERGE_SIZE, + ) + }); this.compact_join_handle.get_mut().replace(handle); } Ok(this) @@ -131,8 +137,13 @@ impl<'a> BaseWriteBatch<'a> for TurboWriteBatch<'a> { if !self.initial_write { // Start a new compaction in the background let db = self.db.clone(); - let handle = - spawn(move || db.compact(COMPACT_MAX_COVERAGE, COMPACT_MAX_MERGE_SEQUENCE)); + let handle = spawn(move || { + db.compact( + COMPACT_MAX_COVERAGE, + COMPACT_MAX_MERGE_SEQUENCE, + COMPACT_MAX_MERGE_SIZE, + ) + }); self.compact_join_handle.lock().replace(handle); }