diff --git a/turbopack/crates/turbo-persistence/src/write_batch.rs b/turbopack/crates/turbo-persistence/src/write_batch.rs index 6a62c4cdb0a93..bf4692097f65f 100644 --- a/turbopack/crates/turbo-persistence/src/write_batch.rs +++ b/turbopack/crates/turbo-persistence/src/write_batch.rs @@ -12,7 +12,7 @@ use byteorder::{WriteBytesExt, BE}; use lzzzz::lz4::{self, ACC_LEVEL_DEFAULT}; use parking_lot::Mutex; use rayon::{ - iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator}, + iter::{Either, IndexedParallelIterator, IntoParallelIterator, ParallelIterator}, scope, }; use smallvec::SmallVec; @@ -40,6 +40,10 @@ struct ThreadLocalState { new_blob_files: Vec<(u32, File)>, } +const COLLECTOR_SHARDS: usize = 4; +const COLLECTOR_SHARD_SHIFT: usize = + u64::BITS as usize - COLLECTOR_SHARDS.trailing_zeros() as usize; + /// The result of a `WriteBatch::finish` operation. pub(crate) struct FinishResult { pub(crate) sequence_number: u32, @@ -49,6 +53,14 @@ pub(crate) struct FinishResult { pub(crate) new_blob_files: Vec<(u32, File)>, } +enum GlobalCollectorState { + /// Initial state. Single collector. Once the collector is full, we switch to sharded mode. + Unsharded(Collector), + /// Sharded mode. + /// We use multiple collectors, and select one based on the first bits of the key hash. + Sharded([Collector; COLLECTOR_SHARDS]), +} + /// A write batch. pub struct WriteBatch { /// The database path @@ -58,7 +70,7 @@ pub struct WriteBatch { /// The thread local state. thread_locals: ThreadLocal>>, /// Collectors in use. The thread local collectors flush into these when they are full. - collectors: [Mutex>; FAMILIES], + collectors: [Mutex>; FAMILIES], /// The list of new SST files that have been created. /// Tuple of (sequence number, file). new_sst_files: Mutex>, @@ -78,7 +90,8 @@ impl WriteBatch { path, current_sequence_number: AtomicU32::new(current), thread_locals: ThreadLocal::new(), - collectors: [(); FAMILIES].map(|_| Mutex::new(Collector::new())), + collectors: [(); FAMILIES] + .map(|_| Mutex::new(GlobalCollectorState::Unsharded(Collector::new()))), new_sst_files: Mutex::new(Vec::new()), idle_collectors: Mutex::new(Vec::new()), idle_thread_local_collectors: Mutex::new(Vec::new()), @@ -131,17 +144,39 @@ impl WriteBatch { ) -> Result<()> { let mut full_collectors = SmallVec::<[_; 2]>::new(); { - let mut global_collector = self.collectors[usize_from_u32(family)].lock(); + let mut global_collector_state = self.collectors[usize_from_u32(family)].lock(); for entry in collector.drain() { - global_collector.add_entry(entry); - if global_collector.is_full() { - full_collectors.push(replace( - &mut *global_collector, - self.idle_collectors - .lock() - .pop() - .unwrap_or_else(|| Collector::new()), - )); + match &mut *global_collector_state { + GlobalCollectorState::Unsharded(collector) => { + collector.add_entry(entry); + if collector.is_full() { + // When full, split the entries into shards. + let mut shards: [Collector; 4] = + [(); COLLECTOR_SHARDS].map(|_| Collector::new()); + for entry in collector.drain() { + let shard = (entry.key.hash >> COLLECTOR_SHARD_SHIFT) as usize; + shards[shard].add_entry(entry); + } + // There is a rare edge case where all entries are in the same shard, + // and the collector is full after the split. + for collector in shards.iter_mut() { + if collector.is_full() { + full_collectors + .push(replace(&mut *collector, self.get_new_collector())); + } + } + *global_collector_state = GlobalCollectorState::Sharded(shards); + } + } + GlobalCollectorState::Sharded(shards) => { + let shard = (entry.key.hash >> COLLECTOR_SHARD_SHIFT) as usize; + let collector = &mut shards[shard]; + collector.add_entry(entry); + if collector.is_full() { + full_collectors + .push(replace(&mut *collector, self.get_new_collector())); + } + } } } } @@ -155,6 +190,13 @@ impl WriteBatch { Ok(()) } + fn get_new_collector(&self) -> Collector { + self.idle_collectors + .lock() + .pop() + .unwrap_or_else(|| Collector::new()) + } + /// Puts a key-value pair into the write batch. pub fn put(&self, family: u32, key: K, value: ValueBuffer<'_>) -> Result<()> { let state = self.thread_local_state(); @@ -217,23 +259,27 @@ impl WriteBatch { let mut new_sst_files = take(self.new_sst_files.get_mut()); let shared_new_sst_files = Mutex::new(&mut new_sst_files); - let collectors = replace( - &mut self.collectors, - [(); FAMILIES].map(|_| { - Mutex::new( - self.idle_collectors - .lock() - .pop() - .unwrap_or_else(|| Collector::new()), - ) - }), - ); + let new_collectors = [(); FAMILIES] + .map(|_| Mutex::new(GlobalCollectorState::Unsharded(self.get_new_collector()))); + let collectors = replace(&mut self.collectors, new_collectors); collectors .into_par_iter() .enumerate() - .try_for_each(|(family, collector)| { + .flat_map(|(family, state)| { + let collector = state.into_inner(); + match collector { + GlobalCollectorState::Unsharded(collector) => { + Either::Left([(family, collector)].into_par_iter()) + } + GlobalCollectorState::Sharded(shards) => Either::Right( + shards + .into_par_iter() + .map(move |collector| (family, collector)), + ), + } + }) + .try_for_each(|(family, mut collector)| { let family = family as u32; - let mut collector = collector.into_inner(); if !collector.is_empty() { let sst = self.create_sst_file(family, collector.sorted())?; collector.clear();