Skip to content

Turbopack: write a LOG file for the database #78650

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Apr 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 66 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions turbopack/crates/turbo-persistence/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ print_stats = ["stats"]
anyhow = { workspace = true }
pot = "3.0.0"
byteorder = "1.5.0"
jiff = "0.2.10"
lzzzz = "1.1.0"
memmap2 = "0.9.5"
parking_lot = { workspace = true }
Expand Down
82 changes: 79 additions & 3 deletions turbopack/crates/turbo-persistence/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{
any::{Any, TypeId},
collections::HashSet,
fs::{self, File, OpenOptions, ReadDir},
io::Write,
io::{BufWriter, Write},
mem::{swap, transmute, MaybeUninit},
path::{Path, PathBuf},
sync::{
Expand All @@ -13,6 +13,7 @@ use std::{

use anyhow::{bail, Context, Result};
use byteorder::{ReadBytesExt, WriteBytesExt, BE};
use jiff::Timestamp;
use lzzzz::lz4::decompress;
use memmap2::Mmap;
use parking_lot::{Mutex, RwLock};
Expand Down Expand Up @@ -287,6 +288,9 @@ impl TurboPersistence {
Some("CURRENT") => {
// Already read
}
Some("LOG") => {
// Ignored, write-only
}
_ => {
if !path
.file_name()
Expand Down Expand Up @@ -393,6 +397,15 @@ impl TurboPersistence {
Ok(WriteBatch::new(self.path.clone(), current))
}

fn open_log(&self) -> Result<BufWriter<File>> {
let log_path = self.path.join("LOG");
let log_file = OpenOptions::new()
.create(true)
.append(true)
.open(log_path)?;
Ok(BufWriter::new(log_file))
}

/// Commits a WriteBatch to the database. This will finish writing the data to disk and make it
/// visible to readers.
pub fn commit_write_batch<K: StoreKey + Send + Sync + 'static, const FAMILIES: usize>(
Expand All @@ -418,10 +431,12 @@ impl TurboPersistence {
fn commit(
&self,
mut new_sst_files: Vec<(u32, File)>,
new_blob_files: Vec<File>,
new_blob_files: Vec<(u32, File)>,
mut indicies_to_delete: Vec<usize>,
mut seq: u32,
) -> Result<(), anyhow::Error> {
let time = Timestamp::now();

new_sst_files.sort_unstable_by_key(|(seq, _)| *seq);

let mut new_sst_files = new_sst_files
Expand All @@ -432,10 +447,20 @@ impl TurboPersistence {
})
.collect::<Result<Vec<_>>>()?;

for file in new_blob_files {
for (_, file) in new_blob_files.iter() {
file.sync_all()?;
}

let new_sst_info = new_sst_files
.iter()
.map(|sst| {
let seq = sst.sequence_number();
let range = sst.range()?;
let size = sst.size();
Ok((seq, range.family, range.min_hash, range.max_hash, size))
})
.collect::<Result<Vec<_>>>()?;

if !indicies_to_delete.is_empty() {
seq += 1;
}
Expand Down Expand Up @@ -479,6 +504,30 @@ impl TurboPersistence {
fs::remove_file(self.path.join(format!("{seq:08}.sst")))?;
}

{
let mut log = self.open_log()?;
writeln!(log, "Time {}", time)?;
let span = time.until(Timestamp::now())?;
writeln!(log, "Commit {seq:08} {:#}", span)?;
for (index, family, min, max, size) in new_sst_info.iter() {
writeln!(
log,
"{:08} SST family:{} {:016x}-{:016x} {} MiB",
index,
family,
min,
max,
size / 1024 / 1024
)?;
}
for (seq, _) in new_blob_files.iter() {
writeln!(log, "{:08} BLOB", seq)?;
}
for index in indicies_to_delete.iter() {
writeln!(log, "{:08} DELETED", index)?;
}
}

Ok(())
}

Expand Down Expand Up @@ -583,6 +632,7 @@ impl TurboPersistence {
let value_block_cache = &self.value_block_cache;
let path = &self.path;

let log_mutex = Mutex::new(());
let result = sst_by_family
.into_par_iter()
.with_min_len(1)
Expand All @@ -604,6 +654,32 @@ impl TurboPersistence {
},
);

if !merge_jobs.is_empty() {
let guard = log_mutex.lock();
let mut log = self.open_log()?;
writeln!(
log,
"Compaction for family {family} (coverage: {coverage}):"
)?;
for job in merge_jobs.iter() {
writeln!(log, " merge")?;
for i in job.iter() {
let index = ssts_with_ranges[*i].index;
let (min, max) = ssts_with_ranges[*i].range();
writeln!(log, " {index:08} {min:016x}-{max:016x}")?;
}
}
if !move_jobs.is_empty() {
writeln!(log, " move")?;
for i in move_jobs.iter() {
let index = ssts_with_ranges[*i].index;
let (min, max) = ssts_with_ranges[*i].range();
writeln!(log, " {index:08} {min:016x}-{max:016x}")?;
}
}
drop(guard);
}

// Later we will remove the merged and moved files
let indicies_to_delete = merge_jobs
.iter()
Expand Down
5 changes: 5 additions & 0 deletions turbopack/crates/turbo-persistence/src/static_sorted_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@ impl StaticSortedFile {
self.sequence_number
}

/// The size of this file in bytes.
pub fn size(&self) -> usize {
self.mmap.len()
}

/// Opens an SST file at the given path. This memory maps the file, but does not read it yet.
/// It's lazy read on demand.
pub fn open(sequence_number: u32, path: PathBuf) -> Result<Self> {
Expand Down
12 changes: 9 additions & 3 deletions turbopack/crates/turbo-persistence/src/write_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,20 @@ struct ThreadLocalState<K: StoreKey + Send, const FAMILIES: usize> {
/// The collectors for each family.
collectors: [Option<Collector<K>>; FAMILIES],
/// The list of new SST files that have been created.
/// Tuple of (sequence number, file).
new_sst_files: Vec<(u32, File)>,
/// The list of new blob files that have been created.
new_blob_files: Vec<File>,
/// Tuple of (sequence number, file).
new_blob_files: Vec<(u32, File)>,
}

/// The result of a `WriteBatch::finish` operation.
pub(crate) struct FinishResult {
pub(crate) sequence_number: u32,
/// Tuple of (sequence number, file).
pub(crate) new_sst_files: Vec<(u32, File)>,
pub(crate) new_blob_files: Vec<File>,
/// Tuple of (sequence number, file).
pub(crate) new_blob_files: Vec<(u32, File)>,
}

/// A write batch.
Expand Down Expand Up @@ -121,7 +125,7 @@ impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {
} else {
let (blob, file) = self.create_blob(&value)?;
collector.put_blob(key, blob);
state.new_blob_files.push(file);
state.new_blob_files.push((blob, file));
}
Ok(())
}
Expand Down Expand Up @@ -240,6 +244,7 @@ impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {
}

/// Creates a new blob file with the given value.
/// Returns a tuple of (sequence number, file).
fn create_blob(&self, value: &[u8]) -> Result<(u32, File)> {
let seq = self.current_sequence_number.fetch_add(1, Ordering::SeqCst) + 1;
let mut buffer = Vec::new();
Expand All @@ -256,6 +261,7 @@ impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {
}

/// Creates a new SST file with the given collector data.
/// Returns a tuple of (sequence number, file).
fn create_sst_file(
&self,
family: u32,
Expand Down
Loading