diff --git a/turbopack/crates/turbo-persistence/src/collector.rs b/turbopack/crates/turbo-persistence/src/collector.rs index 82e82b9cf9201..86ac0daab07a2 100644 --- a/turbopack/crates/turbo-persistence/src/collector.rs +++ b/turbopack/crates/turbo-persistence/src/collector.rs @@ -9,19 +9,19 @@ use crate::{ /// A collector accumulates entries that should be eventually written to a file. It keeps track of /// count and size of the entries to decide when it's "full". Accessing the entries sorts them. -pub struct Collector { +pub struct Collector { total_key_size: usize, total_value_size: usize, entries: Vec>, } -impl Collector { +impl Collector { /// Creates a new collector. Note that this allocates the full capacity for the entries. pub fn new() -> Self { Self { total_key_size: 0, total_value_size: 0, - entries: Vec::with_capacity(MAX_ENTRIES_PER_INITIAL_FILE), + entries: Vec::with_capacity(MAX_ENTRIES_PER_INITIAL_FILE >> SIZE_SHIFT), } } @@ -32,8 +32,9 @@ impl Collector { /// Returns true if the collector is full. pub fn is_full(&self) -> bool { - self.entries.len() >= MAX_ENTRIES_PER_INITIAL_FILE - || self.total_key_size + self.total_value_size > DATA_THRESHOLD_PER_INITIAL_FILE + self.entries.len() >= MAX_ENTRIES_PER_INITIAL_FILE >> SIZE_SHIFT + || self.total_key_size + self.total_value_size + > DATA_THRESHOLD_PER_INITIAL_FILE >> SIZE_SHIFT } /// Adds a normal key-value pair to the collector. @@ -110,9 +111,4 @@ impl Collector { self.total_value_size = 0; self.entries.drain(..) } - - /// Returns the number of entries in the collector. - pub fn len(&self) -> usize { - self.entries.len() - } } diff --git a/turbopack/crates/turbo-persistence/src/constants.rs b/turbopack/crates/turbo-persistence/src/constants.rs index de67de1b8e084..9a46fa00a31e3 100644 --- a/turbopack/crates/turbo-persistence/src/constants.rs +++ b/turbopack/crates/turbo-persistence/src/constants.rs @@ -17,6 +17,10 @@ pub const DATA_THRESHOLD_PER_INITIAL_FILE: usize = 256 * 1024 * 1024; /// Finish file when total amount of data exceeds this pub const DATA_THRESHOLD_PER_COMPACTED_FILE: usize = 256 * 1024 * 1024; +/// Reduction factor (as bit shift) for the size of the thread-local buffer as shift of +/// MAX_ENTRIES_PER_INITIAL_FILE and DATA_THRESHOLD_PER_INITIAL_FILE. +pub const THREAD_LOCAL_SIZE_SHIFT: usize = 7; + /// Maximum RAM bytes for AQMF cache pub const AQMF_CACHE_SIZE: u64 = 300 * 1024 * 1024; pub const AQMF_AVG_SIZE: usize = 37399; diff --git a/turbopack/crates/turbo-persistence/src/write_batch.rs b/turbopack/crates/turbo-persistence/src/write_batch.rs index 09fa9553ef34d..6a62c4cdb0a93 100644 --- a/turbopack/crates/turbo-persistence/src/write_batch.rs +++ b/turbopack/crates/turbo-persistence/src/write_batch.rs @@ -2,7 +2,7 @@ use std::{ cell::UnsafeCell, fs::File, io::Write, - mem::{replace, swap}, + mem::{replace, take}, path::PathBuf, sync::atomic::{AtomicU32, Ordering}, }; @@ -13,13 +13,18 @@ use lzzzz::lz4::{self, ACC_LEVEL_DEFAULT}; use parking_lot::Mutex; use rayon::{ iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator}, - scope, Scope, + scope, }; +use smallvec::SmallVec; use thread_local::ThreadLocal; use crate::{ - collector::Collector, collector_entry::CollectorEntry, constants::MAX_MEDIUM_VALUE_SIZE, - key::StoreKey, static_sorted_file_builder::StaticSortedFileBuilder, ValueBuffer, + collector::Collector, + collector_entry::CollectorEntry, + constants::{MAX_MEDIUM_VALUE_SIZE, THREAD_LOCAL_SIZE_SHIFT}, + key::StoreKey, + static_sorted_file_builder::StaticSortedFileBuilder, + ValueBuffer, }; /// The thread local state of a `WriteBatch`. `FAMILIES` should fit within a `u32`. @@ -29,10 +34,7 @@ use crate::{ // `min_generic_const_args` feature. struct ThreadLocalState { /// The collectors for each family. - collectors: [Option>; FAMILIES], - /// The list of new SST files that have been created. - /// Tuple of (sequence number, file). - new_sst_files: Vec<(u32, File)>, + collectors: [Option>; FAMILIES], /// The list of new blob files that have been created. /// Tuple of (sequence number, file). new_blob_files: Vec<(u32, File)>, @@ -55,8 +57,15 @@ pub struct WriteBatch { current_sequence_number: AtomicU32, /// The thread local state. thread_locals: ThreadLocal>>, - /// Collectors are are current unused, but have memory preallocated. + /// Collectors in use. The thread local collectors flush into these when they are full. + collectors: [Mutex>; FAMILIES], + /// The list of new SST files that have been created. + /// Tuple of (sequence number, file). + new_sst_files: Mutex>, + /// Collectors that are currently unused, but have memory preallocated. idle_collectors: Mutex>>, + /// Collectors that are currently unused, but have memory preallocated. + idle_thread_local_collectors: Mutex>>, } impl WriteBatch { @@ -69,7 +78,10 @@ impl WriteBatch { path, current_sequence_number: AtomicU32::new(current), thread_locals: ThreadLocal::new(), + collectors: [(); FAMILIES].map(|_| Mutex::new(Collector::new())), + new_sst_files: Mutex::new(Vec::new()), idle_collectors: Mutex::new(Vec::new()), + idle_thread_local_collectors: Mutex::new(Vec::new()), } } @@ -86,7 +98,6 @@ impl WriteBatch { let cell = self.thread_locals.get_or(|| { UnsafeCell::new(ThreadLocalState { collectors: [const { None }; FAMILIES], - new_sst_files: Vec::new(), new_blob_files: Vec::new(), }) }); @@ -95,31 +106,59 @@ impl WriteBatch { } /// Returns the collector for a family for the current thread. - fn collector_mut<'l>( + fn thread_local_collector_mut<'l>( &self, state: &'l mut ThreadLocalState, family: u32, - ) -> Result<&'l mut Collector> { - let family_idx = usize_from_u32(family); - debug_assert!(family_idx < FAMILIES); - let collector = state.collectors[family_idx].get_or_insert_with(|| { - self.idle_collectors + ) -> Result<&'l mut Collector> { + debug_assert!(usize_from_u32(family) < FAMILIES); + let collector = state.collectors[usize_from_u32(family)].get_or_insert_with(|| { + self.idle_thread_local_collectors .lock() .pop() .unwrap_or_else(|| Collector::new()) }); if collector.is_full() { - let sst = self.create_sst_file(family, collector.sorted())?; - collector.clear(); - state.new_sst_files.push(sst); + self.flush_thread_local_collector(family, collector)?; } Ok(collector) } + fn flush_thread_local_collector( + &self, + family: u32, + collector: &mut Collector, + ) -> Result<()> { + let mut full_collectors = SmallVec::<[_; 2]>::new(); + { + let mut global_collector = self.collectors[usize_from_u32(family)].lock(); + for entry in collector.drain() { + global_collector.add_entry(entry); + if global_collector.is_full() { + full_collectors.push(replace( + &mut *global_collector, + self.idle_collectors + .lock() + .pop() + .unwrap_or_else(|| Collector::new()), + )); + } + } + } + for mut global_collector in full_collectors { + // When the global collector is full, we create a new SST file. + let sst = self.create_sst_file(family, global_collector.sorted())?; + global_collector.clear(); + self.new_sst_files.lock().push(sst); + self.idle_collectors.lock().push(global_collector); + } + Ok(()) + } + /// Puts a key-value pair into the write batch. pub fn put(&self, family: u32, key: K, value: ValueBuffer<'_>) -> Result<()> { let state = self.thread_local_state(); - let collector = self.collector_mut(state, family)?; + let collector = self.thread_local_collector_mut(state, family)?; if value.len() <= MAX_MEDIUM_VALUE_SIZE { collector.put(key, value); } else { @@ -133,7 +172,7 @@ impl WriteBatch { /// Puts a delete operation into the write batch. pub fn delete(&self, family: u32, key: K) -> Result<()> { let state = self.thread_local_state(); - let collector = self.collector_mut(state, family)?; + let collector = self.thread_local_collector_mut(state, family)?; collector.delete(key); Ok(()) } @@ -141,98 +180,69 @@ impl WriteBatch { /// Finishes the write batch by returning the new sequence number and the new SST files. This /// writes all outstanding thread local data to disk. pub(crate) fn finish(&mut self) -> Result { - let mut new_sst_files = Vec::new(); let mut new_blob_files = Vec::new(); - let mut all_collectors = [(); FAMILIES].map(|_| Vec::new()); - for cell in self.thread_locals.iter_mut() { - let state = cell.get_mut(); - new_sst_files.append(&mut state.new_sst_files); - new_blob_files.append(&mut state.new_blob_files); - for (family, global_collector) in all_collectors.iter_mut().enumerate() { - if let Some(collector) = state.collectors[family].take() { - if !collector.is_empty() { - global_collector.push(Some(collector)); - } - } - } - } - let shared_new_sst_files = Mutex::new(&mut new_sst_files); let shared_error = Mutex::new(Ok(())); + + // First, we flush all thread local collectors to the global collectors. scope(|scope| { - fn handle_done_collector<'scope, K: StoreKey + Send + Sync, const FAMILIES: usize>( - this: &'scope WriteBatch, - scope: &Scope<'scope>, - family: u32, - mut collector: Collector, - shared_new_sst_files: &'scope Mutex<&mut Vec<(u32, File)>>, - shared_error: &'scope Mutex>, - ) { - scope.spawn( - move |_| match this.create_sst_file(family, collector.sorted()) { - Ok(sst) => { - collector.clear(); - this.idle_collectors.lock().push(collector); - shared_new_sst_files.lock().push(sst); + let mut collectors = [const { Vec::new() }; FAMILIES]; + for cell in self.thread_locals.iter_mut() { + let state = cell.get_mut(); + new_blob_files.append(&mut state.new_blob_files); + for (family, thread_local_collector) in state.collectors.iter_mut().enumerate() { + if let Some(collector) = thread_local_collector.take() { + if !collector.is_empty() { + collectors[family].push(collector); } - Err(err) => { + } + } + } + for (family, thread_local_collectors) in collectors.into_iter().enumerate() { + for mut collector in thread_local_collectors { + let this = &self; + let shared_error = &shared_error; + scope.spawn(move |_| { + if let Err(err) = + this.flush_thread_local_collector(family as u32, &mut collector) + { *shared_error.lock() = Err(err); } - }, - ); + this.idle_thread_local_collectors.lock().push(collector); + }); + } } - - all_collectors - .into_par_iter() - .enumerate() - .for_each(|(family_idx, collectors)| { - let family = u32::try_from(family_idx).unwrap(); - let final_collector = collectors.into_par_iter().reduce( - || None, - |a, b| match (a, b) { - (Some(mut a), Some(mut b)) => { - if a.len() < b.len() { - swap(&mut a, &mut b); - } - for entry in b.drain() { - if a.is_full() { - let full_collector = replace( - &mut a, - self.idle_collectors - .lock() - .pop() - .unwrap_or_else(|| Collector::new()), - ); - handle_done_collector( - self, - scope, - family, - full_collector, - &shared_new_sst_files, - &shared_error, - ); - } - a.add_entry(entry); - } - self.idle_collectors.lock().push(b); - Some(a) - } - (Some(a), None) => Some(a), - (None, Some(b)) => Some(b), - (None, None) => None, - }, - ); - if let Some(collector) = final_collector { - handle_done_collector( - self, - scope, - family, - collector, - &shared_new_sst_files, - &shared_error, - ); - } - }); }); + + // Now we reduce the global collectors in parallel + let mut new_sst_files = take(self.new_sst_files.get_mut()); + let shared_new_sst_files = Mutex::new(&mut new_sst_files); + + let collectors = replace( + &mut self.collectors, + [(); FAMILIES].map(|_| { + Mutex::new( + self.idle_collectors + .lock() + .pop() + .unwrap_or_else(|| Collector::new()), + ) + }), + ); + collectors + .into_par_iter() + .enumerate() + .try_for_each(|(family, collector)| { + let family = family as u32; + let mut collector = collector.into_inner(); + if !collector.is_empty() { + let sst = self.create_sst_file(family, collector.sorted())?; + collector.clear(); + self.idle_collectors.lock().push(collector); + shared_new_sst_files.lock().push(sst); + } + anyhow::Ok(()) + })?; + shared_error.into_inner()?; let seq = self.current_sequence_number.load(Ordering::SeqCst); new_sst_files.sort_by_key(|(seq, _)| *seq);