From 210b268cd24e32b86dca97451b893e59fcd3ad23 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Fri, 2 May 2025 09:36:43 +0200 Subject: [PATCH 1/5] add tracing --- turbopack/crates/turbo-persistence/src/write_batch.rs | 7 +++++++ .../crates/turbopack-trace-utils/src/tracing_presets.rs | 1 + 2 files changed, 8 insertions(+) diff --git a/turbopack/crates/turbo-persistence/src/write_batch.rs b/turbopack/crates/turbo-persistence/src/write_batch.rs index 2f234d2eac5b7..c63c4789b2526 100644 --- a/turbopack/crates/turbo-persistence/src/write_batch.rs +++ b/turbopack/crates/turbo-persistence/src/write_batch.rs @@ -138,6 +138,7 @@ impl WriteBatch { Ok(collector) } + #[tracing::instrument(level = "trace", skip(self, collector))] fn flush_thread_local_collector( &self, family: u32, @@ -290,12 +291,14 @@ impl WriteBatch { /// Finishes the write batch by returning the new sequence number and the new SST files. This /// writes all outstanding thread local data to disk. + #[tracing::instrument(level = "trace", skip(self))] pub(crate) fn finish(&mut self) -> Result { let mut new_blob_files = Vec::new(); let shared_error = Mutex::new(Ok(())); // First, we flush all thread local collectors to the global collectors. scope(|scope| { + let _span = tracing::trace_span!("flush thread local collectors").entered(); let mut collectors = [const { Vec::new() }; FAMILIES]; for cell in self.thread_locals.iter_mut() { let state = cell.get_mut(); @@ -324,6 +327,8 @@ impl WriteBatch { } }); + let _span = tracing::trace_span!("flush collectors").entered(); + // Now we reduce the global collectors in parallel let mut new_sst_files = take(self.new_sst_files.get_mut()); let shared_new_sst_files = Mutex::new(&mut new_sst_files); @@ -370,6 +375,7 @@ impl WriteBatch { /// Creates a new blob file with the given value. /// Returns a tuple of (sequence number, file). + #[tracing::instrument(level = "trace", skip(self, value), fields(value_len = value.len()))] fn create_blob(&self, value: &[u8]) -> Result<(u32, File)> { let seq = self.current_sequence_number.fetch_add(1, Ordering::SeqCst) + 1; let mut buffer = Vec::new(); @@ -387,6 +393,7 @@ impl WriteBatch { /// Creates a new SST file with the given collector data. /// Returns a tuple of (sequence number, file). + #[tracing::instrument(level = "trace", skip(self, collector_data))] fn create_sst_file( &self, family: u32, diff --git a/turbopack/crates/turbopack-trace-utils/src/tracing_presets.rs b/turbopack/crates/turbopack-trace-utils/src/tracing_presets.rs index e6f4e65781255..b3422fe86a1cf 100644 --- a/turbopack/crates/turbopack-trace-utils/src/tracing_presets.rs +++ b/turbopack/crates/turbopack-trace-utils/src/tracing_presets.rs @@ -71,6 +71,7 @@ pub static TRACING_TURBO_TASKS_TARGETS: Lazy> = Lazy::new(|| { "turbo_tasks_hash=trace", "turbo_tasks_memory=trace", "turbo_tasks_backend=trace", + "turbo_persistence=trace", ], ] .concat() From aaf48349706e34b6e6d10d346ae017fd72eeb937 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Fri, 2 May 2025 10:35:49 +0200 Subject: [PATCH 2/5] more tracing --- turbopack/crates/turbo-persistence/src/db.rs | 9 +++++++++ turbopack/crates/turbo-persistence/src/write_batch.rs | 4 ++++ .../crates/turbo-tasks-backend/src/kv_backing_storage.rs | 1 + 3 files changed, 14 insertions(+) diff --git a/turbopack/crates/turbo-persistence/src/db.rs b/turbopack/crates/turbo-persistence/src/db.rs index 5f9f72d72f514..a892bff7082ba 100644 --- a/turbopack/crates/turbo-persistence/src/db.rs +++ b/turbopack/crates/turbo-persistence/src/db.rs @@ -18,6 +18,7 @@ use lzzzz::lz4::decompress; use memmap2::Mmap; use parking_lot::{Mutex, RwLock}; use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator}; +use tracing::Span; use crate::{ arc_slice::ArcSlice, @@ -548,6 +549,7 @@ impl TurboPersistence { max_merge_sequence: usize, max_merge_size: usize, ) -> Result<()> { + let _span = tracing::info_span!("compact database").entered(); if self .active_write_operation .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) @@ -653,11 +655,13 @@ impl TurboPersistence { let path = &self.path; let log_mutex = Mutex::new(()); + let span = Span::current(); let result = sst_by_family .into_par_iter() .with_min_len(1) .enumerate() .map(|(family, ssts_with_ranges)| { + let _span = span.clone().entered(); let coverage = total_coverage(&ssts_with_ranges, (0, u64::MAX)); if coverage <= max_coverage { return Ok((Vec::new(), Vec::new())); @@ -710,10 +714,12 @@ impl TurboPersistence { .collect::>(); // Merge SST files + let span = tracing::trace_span!("merge files"); let merge_result = merge_jobs .into_par_iter() .with_min_len(1) .map(|indicies| { + let _span = span.clone().entered(); fn create_sst_file( family: u32, entries: &[LookupEntry], @@ -722,6 +728,7 @@ impl TurboPersistence { path: &Path, seq: u32, ) -> Result<(u32, File)> { + let _span = tracing::trace_span!("write merged sst file").entered(); let builder = StaticSortedFileBuilder::new( family, entries, @@ -865,10 +872,12 @@ impl TurboPersistence { .collect::>(); // Move SST files + let span = tracing::trace_span!("move files"); let mut new_sst_files = move_jobs .into_par_iter() .with_min_len(1) .map(|(index, seq)| { + let _span = span.clone().entered(); let index = ssts_with_ranges[index].index; let sst = &static_sorted_files[index]; let src_path = self.path.join(format!("{:08}.sst", sst.sequence_number())); diff --git a/turbopack/crates/turbo-persistence/src/write_batch.rs b/turbopack/crates/turbo-persistence/src/write_batch.rs index c63c4789b2526..8f359357ee578 100644 --- a/turbopack/crates/turbo-persistence/src/write_batch.rs +++ b/turbopack/crates/turbo-persistence/src/write_batch.rs @@ -315,7 +315,9 @@ impl WriteBatch { for mut collector in thread_local_collectors { let this = &self; let shared_error = &shared_error; + let span = Span::current(); scope.spawn(move |_| { + let _span = span.entered(); if let Err(err) = this.flush_thread_local_collector(family as u32, &mut collector) { @@ -336,6 +338,7 @@ impl WriteBatch { let new_collectors = [(); FAMILIES] .map(|_| Mutex::new(GlobalCollectorState::Unsharded(self.get_new_collector()))); let collectors = replace(&mut self.collectors, new_collectors); + let span = Span::current(); collectors .into_par_iter() .enumerate() @@ -353,6 +356,7 @@ impl WriteBatch { } }) .try_for_each(|(family, mut collector)| { + let _span = span.clone().entered(); let family = family as u32; if !collector.is_empty() { let sst = self.create_sst_file(family, collector.sorted())?; diff --git a/turbopack/crates/turbo-tasks-backend/src/kv_backing_storage.rs b/turbopack/crates/turbo-tasks-backend/src/kv_backing_storage.rs index 90c3b5c0334be..e2939bb76fe86 100644 --- a/turbopack/crates/turbo-tasks-backend/src/kv_backing_storage.rs +++ b/turbopack/crates/turbo-tasks-backend/src/kv_backing_storage.rs @@ -208,6 +208,7 @@ impl BackingStorage .into_par_iter() .with_max_len(1) .map(|updates| { + let _span = _span.clone().entered(); let mut max_task_id = 0; let mut task_type_bytes = Vec::new(); From 4ea92d5c0971bb9ad3cade1af10b2f4abc7f2a83 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Fri, 2 May 2025 16:05:01 +0200 Subject: [PATCH 3/5] tracing --- .../turbo-persistence/src/static_sorted_file_builder.rs | 6 ++++++ .../crates/turbo-tasks-backend/src/kv_backing_storage.rs | 2 ++ 2 files changed, 8 insertions(+) diff --git a/turbopack/crates/turbo-persistence/src/static_sorted_file_builder.rs b/turbopack/crates/turbo-persistence/src/static_sorted_file_builder.rs index 03dfc6ef21c0d..8294d56acadd7 100644 --- a/turbopack/crates/turbo-persistence/src/static_sorted_file_builder.rs +++ b/turbopack/crates/turbo-persistence/src/static_sorted_file_builder.rs @@ -103,6 +103,7 @@ impl StaticSortedFileBuilder { } /// Computes a AQMF from the keys of all entries. + #[tracing::instrument(skip_all)] fn compute_aqmf(&mut self, entries: &[E]) { let mut filter = qfilter::Filter::new(entries.len() as u64, AQMF_FALSE_POSITIVE_RATE) // This won't fail as we limit the number of entries per SST file @@ -117,6 +118,7 @@ impl StaticSortedFileBuilder { } /// Computes compression dictionaries from keys and values of all entries + #[tracing::instrument(skip_all)] fn compute_compression_dictionary( &mut self, entries: &[E], @@ -202,6 +204,7 @@ impl StaticSortedFileBuilder { } /// Compute index, key and value blocks. + #[tracing::instrument(skip_all)] fn compute_blocks(&mut self, entries: &[E]) { // TODO implement multi level index // TODO place key and value block near to each other @@ -352,16 +355,19 @@ impl StaticSortedFileBuilder { } /// Compresses an index or key block. + #[tracing::instrument(skip_all)] fn compress_key_block(&self, block: &[u8]) -> (u32, Vec) { self.compress_block(block, &self.key_compression_dictionary) } /// Compresses a value block. + #[tracing::instrument(skip_all)] fn compress_value_block(&self, block: &[u8]) -> (u32, Vec) { self.compress_block(block, &self.value_compression_dictionary) } /// Writes the SST file. + #[tracing::instrument(skip_all)] pub fn write(&self, file: &Path) -> io::Result { let mut file = BufWriter::new(File::create(file)?); // magic number and version diff --git a/turbopack/crates/turbo-tasks-backend/src/kv_backing_storage.rs b/turbopack/crates/turbo-tasks-backend/src/kv_backing_storage.rs index e2939bb76fe86..592dff5b0a6ed 100644 --- a/turbopack/crates/turbo-tasks-backend/src/kv_backing_storage.rs +++ b/turbopack/crates/turbo-tasks-backend/src/kv_backing_storage.rs @@ -184,9 +184,11 @@ impl BackingStorage { let _span = tracing::trace_span!("update task data").entered(); process_task_data(snapshots, Some(batch))?; + let span = tracing::trace_span!("flush task data").entered(); [KeySpace::TaskMeta, KeySpace::TaskData] .into_par_iter() .try_for_each(|key_space| { + let _span = span.clone().entered(); // Safety: We already finished all processing of the task data and task // meta unsafe { batch.flush(key_space) } From a84e510236e198080512e88d425ad481cef7c168 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Fri, 2 May 2025 16:12:38 +0200 Subject: [PATCH 4/5] tracing --- turbopack/crates/turbo-persistence/src/write_batch.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/turbopack/crates/turbo-persistence/src/write_batch.rs b/turbopack/crates/turbo-persistence/src/write_batch.rs index 8f359357ee578..0a48ab7474601 100644 --- a/turbopack/crates/turbo-persistence/src/write_batch.rs +++ b/turbopack/crates/turbo-persistence/src/write_batch.rs @@ -236,6 +236,7 @@ impl WriteBatch { /// /// Caller must ensure that no concurrent put or delete operation is happening on the flushed /// family. + #[tracing::instrument(level = "trace", skip(self))] pub unsafe fn flush(&self, family: u32) -> Result<()> { // Flush the thread local collectors to the global collector. let mut collectors = Vec::new(); From 08e89b1b97ae6fedf6c06657a08f82a5159edec1 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Fri, 2 May 2025 18:25:53 +0200 Subject: [PATCH 5/5] setup trace level --- .../src/static_sorted_file_builder.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/turbopack/crates/turbo-persistence/src/static_sorted_file_builder.rs b/turbopack/crates/turbo-persistence/src/static_sorted_file_builder.rs index 8294d56acadd7..e23dbe5e8e31c 100644 --- a/turbopack/crates/turbo-persistence/src/static_sorted_file_builder.rs +++ b/turbopack/crates/turbo-persistence/src/static_sorted_file_builder.rs @@ -103,7 +103,7 @@ impl StaticSortedFileBuilder { } /// Computes a AQMF from the keys of all entries. - #[tracing::instrument(skip_all)] + #[tracing::instrument(level = "trace", skip_all)] fn compute_aqmf(&mut self, entries: &[E]) { let mut filter = qfilter::Filter::new(entries.len() as u64, AQMF_FALSE_POSITIVE_RATE) // This won't fail as we limit the number of entries per SST file @@ -118,7 +118,7 @@ impl StaticSortedFileBuilder { } /// Computes compression dictionaries from keys and values of all entries - #[tracing::instrument(skip_all)] + #[tracing::instrument(level = "trace", skip_all)] fn compute_compression_dictionary( &mut self, entries: &[E], @@ -204,7 +204,7 @@ impl StaticSortedFileBuilder { } /// Compute index, key and value blocks. - #[tracing::instrument(skip_all)] + #[tracing::instrument(level = "trace", skip_all)] fn compute_blocks(&mut self, entries: &[E]) { // TODO implement multi level index // TODO place key and value block near to each other @@ -355,19 +355,19 @@ impl StaticSortedFileBuilder { } /// Compresses an index or key block. - #[tracing::instrument(skip_all)] + #[tracing::instrument(level = "trace", skip_all)] fn compress_key_block(&self, block: &[u8]) -> (u32, Vec) { self.compress_block(block, &self.key_compression_dictionary) } /// Compresses a value block. - #[tracing::instrument(skip_all)] + #[tracing::instrument(level = "trace", skip_all)] fn compress_value_block(&self, block: &[u8]) -> (u32, Vec) { self.compress_block(block, &self.value_compression_dictionary) } /// Writes the SST file. - #[tracing::instrument(skip_all)] + #[tracing::instrument(level = "trace", skip_all)] pub fn write(&self, file: &Path) -> io::Result { let mut file = BufWriter::new(File::create(file)?); // magic number and version