Skip to content

Turbopack: add tracing to turbo-persistence #78777

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
May 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions turbopack/crates/turbo-persistence/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use lzzzz::lz4::decompress;
use memmap2::Mmap;
use parking_lot::{Mutex, RwLock};
use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator};
use tracing::Span;

use crate::{
arc_slice::ArcSlice,
Expand Down Expand Up @@ -548,6 +549,7 @@ impl TurboPersistence {
max_merge_sequence: usize,
max_merge_size: usize,
) -> Result<()> {
let _span = tracing::info_span!("compact database").entered();
if self
.active_write_operation
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
Expand Down Expand Up @@ -653,11 +655,13 @@ impl TurboPersistence {
let path = &self.path;

let log_mutex = Mutex::new(());
let span = Span::current();
let result = sst_by_family
.into_par_iter()
.with_min_len(1)
.enumerate()
.map(|(family, ssts_with_ranges)| {
let _span = span.clone().entered();
let coverage = total_coverage(&ssts_with_ranges, (0, u64::MAX));
if coverage <= max_coverage {
return Ok((Vec::new(), Vec::new()));
Expand Down Expand Up @@ -710,10 +714,12 @@ impl TurboPersistence {
.collect::<Vec<_>>();

// Merge SST files
let span = tracing::trace_span!("merge files");
let merge_result = merge_jobs
.into_par_iter()
.with_min_len(1)
.map(|indicies| {
let _span = span.clone().entered();
fn create_sst_file(
family: u32,
entries: &[LookupEntry],
Expand All @@ -722,6 +728,7 @@ impl TurboPersistence {
path: &Path,
seq: u32,
) -> Result<(u32, File)> {
let _span = tracing::trace_span!("write merged sst file").entered();
let builder = StaticSortedFileBuilder::new(
family,
entries,
Expand Down Expand Up @@ -865,10 +872,12 @@ impl TurboPersistence {
.collect::<Vec<_>>();

// Move SST files
let span = tracing::trace_span!("move files");
let mut new_sst_files = move_jobs
.into_par_iter()
.with_min_len(1)
.map(|(index, seq)| {
let _span = span.clone().entered();
let index = ssts_with_ranges[index].index;
let sst = &static_sorted_files[index];
let src_path = self.path.join(format!("{:08}.sst", sst.sequence_number()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ impl StaticSortedFileBuilder {
}

/// Computes a AQMF from the keys of all entries.
#[tracing::instrument(level = "trace", skip_all)]
fn compute_aqmf<E: Entry>(&mut self, entries: &[E]) {
let mut filter = qfilter::Filter::new(entries.len() as u64, AQMF_FALSE_POSITIVE_RATE)
// This won't fail as we limit the number of entries per SST file
Expand All @@ -117,6 +118,7 @@ impl StaticSortedFileBuilder {
}

/// Computes compression dictionaries from keys and values of all entries
#[tracing::instrument(level = "trace", skip_all)]
fn compute_compression_dictionary<E: Entry>(
&mut self,
entries: &[E],
Expand Down Expand Up @@ -202,6 +204,7 @@ impl StaticSortedFileBuilder {
}

/// Compute index, key and value blocks.
#[tracing::instrument(level = "trace", skip_all)]
fn compute_blocks<E: Entry>(&mut self, entries: &[E]) {
// TODO implement multi level index
// TODO place key and value block near to each other
Expand Down Expand Up @@ -352,16 +355,19 @@ impl StaticSortedFileBuilder {
}

/// Compresses an index or key block.
#[tracing::instrument(level = "trace", skip_all)]
fn compress_key_block(&self, block: &[u8]) -> (u32, Vec<u8>) {
self.compress_block(block, &self.key_compression_dictionary)
}

/// Compresses a value block.
#[tracing::instrument(level = "trace", skip_all)]
fn compress_value_block(&self, block: &[u8]) -> (u32, Vec<u8>) {
self.compress_block(block, &self.value_compression_dictionary)
}

/// Writes the SST file.
#[tracing::instrument(level = "trace", skip_all)]
pub fn write(&self, file: &Path) -> io::Result<File> {
let mut file = BufWriter::new(File::create(file)?);
// magic number and version
Expand Down
12 changes: 12 additions & 0 deletions turbopack/crates/turbo-persistence/src/write_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {
Ok(collector)
}

#[tracing::instrument(level = "trace", skip(self, collector))]
fn flush_thread_local_collector(
&self,
family: u32,
Expand Down Expand Up @@ -235,6 +236,7 @@ impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {
///
/// Caller must ensure that no concurrent put or delete operation is happening on the flushed
/// family.
#[tracing::instrument(level = "trace", skip(self))]
pub unsafe fn flush(&self, family: u32) -> Result<()> {
// Flush the thread local collectors to the global collector.
let mut collectors = Vec::new();
Expand Down Expand Up @@ -290,12 +292,14 @@ impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {

/// Finishes the write batch by returning the new sequence number and the new SST files. This
/// writes all outstanding thread local data to disk.
#[tracing::instrument(level = "trace", skip(self))]
pub(crate) fn finish(&mut self) -> Result<FinishResult> {
let mut new_blob_files = Vec::new();
let shared_error = Mutex::new(Ok(()));

// First, we flush all thread local collectors to the global collectors.
scope(|scope| {
let _span = tracing::trace_span!("flush thread local collectors").entered();
let mut collectors = [const { Vec::new() }; FAMILIES];
for cell in self.thread_locals.iter_mut() {
let state = cell.get_mut();
Expand All @@ -312,7 +316,9 @@ impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {
for mut collector in thread_local_collectors {
let this = &self;
let shared_error = &shared_error;
let span = Span::current();
scope.spawn(move |_| {
let _span = span.entered();
if let Err(err) =
this.flush_thread_local_collector(family as u32, &mut collector)
{
Expand All @@ -324,13 +330,16 @@ impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {
}
});

let _span = tracing::trace_span!("flush collectors").entered();

// Now we reduce the global collectors in parallel
let mut new_sst_files = take(self.new_sst_files.get_mut());
let shared_new_sst_files = Mutex::new(&mut new_sst_files);

let new_collectors = [(); FAMILIES]
.map(|_| Mutex::new(GlobalCollectorState::Unsharded(self.get_new_collector())));
let collectors = replace(&mut self.collectors, new_collectors);
let span = Span::current();
collectors
.into_par_iter()
.enumerate()
Expand All @@ -348,6 +357,7 @@ impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {
}
})
.try_for_each(|(family, mut collector)| {
let _span = span.clone().entered();
let family = family as u32;
if !collector.is_empty() {
let sst = self.create_sst_file(family, collector.sorted())?;
Expand All @@ -370,6 +380,7 @@ impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {

/// Creates a new blob file with the given value.
/// Returns a tuple of (sequence number, file).
#[tracing::instrument(level = "trace", skip(self, value), fields(value_len = value.len()))]
fn create_blob(&self, value: &[u8]) -> Result<(u32, File)> {
let seq = self.current_sequence_number.fetch_add(1, Ordering::SeqCst) + 1;
let mut buffer = Vec::new();
Expand All @@ -387,6 +398,7 @@ impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {

/// Creates a new SST file with the given collector data.
/// Returns a tuple of (sequence number, file).
#[tracing::instrument(level = "trace", skip(self, collector_data))]
fn create_sst_file(
&self,
family: u32,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,11 @@ impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorage
{
let _span = tracing::trace_span!("update task data").entered();
process_task_data(snapshots, Some(batch))?;
let span = tracing::trace_span!("flush task data").entered();
[KeySpace::TaskMeta, KeySpace::TaskData]
.into_par_iter()
.try_for_each(|key_space| {
let _span = span.clone().entered();
// Safety: We already finished all processing of the task data and task
// meta
unsafe { batch.flush(key_space) }
Expand All @@ -208,6 +210,7 @@ impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorage
.into_par_iter()
.with_max_len(1)
.map(|updates| {
let _span = _span.clone().entered();
let mut max_task_id = 0;

let mut task_type_bytes = Vec::new();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ pub static TRACING_TURBO_TASKS_TARGETS: Lazy<Vec<&str>> = Lazy::new(|| {
"turbo_tasks_hash=trace",
"turbo_tasks_memory=trace",
"turbo_tasks_backend=trace",
"turbo_persistence=trace",
],
]
.concat()
Expand Down
Loading