diff --git a/Cargo.lock b/Cargo.lock index 6442d953220bb..0c0fe6f3cb86f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1190,9 +1190,9 @@ dependencies = [ [[package]] name = "chrono" -version = "0.4.38" +version = "0.4.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a21f936df1771bf62b77f047b726c4625ff2e8aa607c01ec06e5a05bd8463401" +checksum = "1a7964611d71df112cb1730f2ee67324fcf4d0fc6606acbbe9bfe06df124637c" dependencies = [ "android-tzdata", "iana-time-zone", @@ -1200,7 +1200,7 @@ dependencies = [ "num-traits", "serde", "wasm-bindgen", - "windows-targets 0.52.6", + "windows-link", ] [[package]] @@ -3562,6 +3562,47 @@ version = "1.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "af150ab688ff2122fcef229be89cb50dd66af9e01a4ff320cc137eecc9bacc38" +[[package]] +name = "jiff" +version = "0.2.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a064218214dc6a10fbae5ec5fa888d80c45d611aba169222fc272072bf7aef6" +dependencies = [ + "jiff-static", + "jiff-tzdb-platform", + "log", + "portable-atomic", + "portable-atomic-util", + "serde", + "windows-sys 0.59.0", +] + +[[package]] +name = "jiff-static" +version = "0.2.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "199b7932d97e325aff3a7030e141eafe7f2c6268e1d1b24859b753a627f45254" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.100", +] + +[[package]] +name = "jiff-tzdb" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1283705eb0a21404d2bfd6eef2a7593d240bc42a0bdb39db0ad6fa2ec026524" + +[[package]] +name = "jiff-tzdb-platform" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "875a5a69ac2bab1a891711cf5eccbec1ce0341ea805560dcd90b7a2e925132e8" +dependencies = [ + "jiff-tzdb", +] + [[package]] name = "jni" version = "0.21.1" @@ -5376,6 +5417,21 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "portable-atomic" +version = "1.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "350e9b48cbc6b0e028b0473b114454c6316e57336ee184ceab6e53f72c178b3e" + +[[package]] +name = "portable-atomic-util" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8a2f0d8d040d7848a709caf78912debcc3f33ee4b3cac47d73d1e1069e83507" +dependencies = [ + "portable-atomic", +] + [[package]] name = "portpicker" version = "0.1.1" @@ -9261,6 +9317,7 @@ version = "0.1.0" dependencies = [ "anyhow", "byteorder", + "jiff", "lzzzz", "memmap2 0.9.5", "parking_lot", @@ -11488,6 +11545,12 @@ dependencies = [ "syn 2.0.100", ] +[[package]] +name = "windows-link" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76840935b766e1b0a05c0066835fb9ec80071d4c09a16f6bd5f7e655e3c14c38" + [[package]] name = "windows-result" version = "0.2.0" diff --git a/turbopack/crates/turbo-persistence/Cargo.toml b/turbopack/crates/turbo-persistence/Cargo.toml index 2efd376c8e14f..abd656d3fcb87 100644 --- a/turbopack/crates/turbo-persistence/Cargo.toml +++ b/turbopack/crates/turbo-persistence/Cargo.toml @@ -14,6 +14,7 @@ print_stats = ["stats"] anyhow = { workspace = true } pot = "3.0.0" byteorder = "1.5.0" +jiff = "0.2.10" lzzzz = "1.1.0" memmap2 = "0.9.5" parking_lot = { workspace = true } diff --git a/turbopack/crates/turbo-persistence/src/db.rs b/turbopack/crates/turbo-persistence/src/db.rs index dbba25e521ec3..22ffc6e538614 100644 --- a/turbopack/crates/turbo-persistence/src/db.rs +++ b/turbopack/crates/turbo-persistence/src/db.rs @@ -2,7 +2,7 @@ use std::{ any::{Any, TypeId}, collections::HashSet, fs::{self, File, OpenOptions, ReadDir}, - io::Write, + io::{BufWriter, Write}, mem::{swap, transmute, MaybeUninit}, path::{Path, PathBuf}, sync::{ @@ -13,6 +13,7 @@ use std::{ use anyhow::{bail, Context, Result}; use byteorder::{ReadBytesExt, WriteBytesExt, BE}; +use jiff::Timestamp; use lzzzz::lz4::decompress; use memmap2::Mmap; use parking_lot::{Mutex, RwLock}; @@ -287,6 +288,9 @@ impl TurboPersistence { Some("CURRENT") => { // Already read } + Some("LOG") => { + // Ignored, write-only + } _ => { if !path .file_name() @@ -393,6 +397,15 @@ impl TurboPersistence { Ok(WriteBatch::new(self.path.clone(), current)) } + fn open_log(&self) -> Result> { + let log_path = self.path.join("LOG"); + let log_file = OpenOptions::new() + .create(true) + .append(true) + .open(log_path)?; + Ok(BufWriter::new(log_file)) + } + /// Commits a WriteBatch to the database. This will finish writing the data to disk and make it /// visible to readers. pub fn commit_write_batch( @@ -418,10 +431,12 @@ impl TurboPersistence { fn commit( &self, mut new_sst_files: Vec<(u32, File)>, - new_blob_files: Vec, + new_blob_files: Vec<(u32, File)>, mut indicies_to_delete: Vec, mut seq: u32, ) -> Result<(), anyhow::Error> { + let time = Timestamp::now(); + new_sst_files.sort_unstable_by_key(|(seq, _)| *seq); let mut new_sst_files = new_sst_files @@ -432,10 +447,20 @@ impl TurboPersistence { }) .collect::>>()?; - for file in new_blob_files { + for (_, file) in new_blob_files.iter() { file.sync_all()?; } + let new_sst_info = new_sst_files + .iter() + .map(|sst| { + let seq = sst.sequence_number(); + let range = sst.range()?; + let size = sst.size(); + Ok((seq, range.family, range.min_hash, range.max_hash, size)) + }) + .collect::>>()?; + if !indicies_to_delete.is_empty() { seq += 1; } @@ -479,6 +504,30 @@ impl TurboPersistence { fs::remove_file(self.path.join(format!("{seq:08}.sst")))?; } + { + let mut log = self.open_log()?; + writeln!(log, "Time {}", time)?; + let span = time.until(Timestamp::now())?; + writeln!(log, "Commit {seq:08} {:#}", span)?; + for (index, family, min, max, size) in new_sst_info.iter() { + writeln!( + log, + "{:08} SST family:{} {:016x}-{:016x} {} MiB", + index, + family, + min, + max, + size / 1024 / 1024 + )?; + } + for (seq, _) in new_blob_files.iter() { + writeln!(log, "{:08} BLOB", seq)?; + } + for index in indicies_to_delete.iter() { + writeln!(log, "{:08} DELETED", index)?; + } + } + Ok(()) } @@ -583,6 +632,7 @@ impl TurboPersistence { let value_block_cache = &self.value_block_cache; let path = &self.path; + let log_mutex = Mutex::new(()); let result = sst_by_family .into_par_iter() .with_min_len(1) @@ -604,6 +654,32 @@ impl TurboPersistence { }, ); + if !merge_jobs.is_empty() { + let guard = log_mutex.lock(); + let mut log = self.open_log()?; + writeln!( + log, + "Compaction for family {family} (coverage: {coverage}):" + )?; + for job in merge_jobs.iter() { + writeln!(log, " merge")?; + for i in job.iter() { + let index = ssts_with_ranges[*i].index; + let (min, max) = ssts_with_ranges[*i].range(); + writeln!(log, " {index:08} {min:016x}-{max:016x}")?; + } + } + if !move_jobs.is_empty() { + writeln!(log, " move")?; + for i in move_jobs.iter() { + let index = ssts_with_ranges[*i].index; + let (min, max) = ssts_with_ranges[*i].range(); + writeln!(log, " {index:08} {min:016x}-{max:016x}")?; + } + } + drop(guard); + } + // Later we will remove the merged and moved files let indicies_to_delete = merge_jobs .iter() diff --git a/turbopack/crates/turbo-persistence/src/static_sorted_file.rs b/turbopack/crates/turbo-persistence/src/static_sorted_file.rs index 1d31a292c0091..902fb917798d8 100644 --- a/turbopack/crates/turbo-persistence/src/static_sorted_file.rs +++ b/turbopack/crates/turbo-persistence/src/static_sorted_file.rs @@ -138,6 +138,11 @@ impl StaticSortedFile { self.sequence_number } + /// The size of this file in bytes. + pub fn size(&self) -> usize { + self.mmap.len() + } + /// Opens an SST file at the given path. This memory maps the file, but does not read it yet. /// It's lazy read on demand. pub fn open(sequence_number: u32, path: PathBuf) -> Result { diff --git a/turbopack/crates/turbo-persistence/src/write_batch.rs b/turbopack/crates/turbo-persistence/src/write_batch.rs index b18f93d8481bc..09fa9553ef34d 100644 --- a/turbopack/crates/turbo-persistence/src/write_batch.rs +++ b/turbopack/crates/turbo-persistence/src/write_batch.rs @@ -31,16 +31,20 @@ struct ThreadLocalState { /// The collectors for each family. collectors: [Option>; FAMILIES], /// The list of new SST files that have been created. + /// Tuple of (sequence number, file). new_sst_files: Vec<(u32, File)>, /// The list of new blob files that have been created. - new_blob_files: Vec, + /// Tuple of (sequence number, file). + new_blob_files: Vec<(u32, File)>, } /// The result of a `WriteBatch::finish` operation. pub(crate) struct FinishResult { pub(crate) sequence_number: u32, + /// Tuple of (sequence number, file). pub(crate) new_sst_files: Vec<(u32, File)>, - pub(crate) new_blob_files: Vec, + /// Tuple of (sequence number, file). + pub(crate) new_blob_files: Vec<(u32, File)>, } /// A write batch. @@ -121,7 +125,7 @@ impl WriteBatch { } else { let (blob, file) = self.create_blob(&value)?; collector.put_blob(key, blob); - state.new_blob_files.push(file); + state.new_blob_files.push((blob, file)); } Ok(()) } @@ -240,6 +244,7 @@ impl WriteBatch { } /// Creates a new blob file with the given value. + /// Returns a tuple of (sequence number, file). fn create_blob(&self, value: &[u8]) -> Result<(u32, File)> { let seq = self.current_sequence_number.fetch_add(1, Ordering::SeqCst) + 1; let mut buffer = Vec::new(); @@ -256,6 +261,7 @@ impl WriteBatch { } /// Creates a new SST file with the given collector data. + /// Returns a tuple of (sequence number, file). fn create_sst_file( &self, family: u32,