Skip to content

Turbopack: use small thread local collector that flushes to global collector #78343

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Apr 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 6 additions & 10 deletions turbopack/crates/turbo-persistence/src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,19 @@ use crate::{

/// A collector accumulates entries that should be eventually written to a file. It keeps track of
/// count and size of the entries to decide when it's "full". Accessing the entries sorts them.
pub struct Collector<K: StoreKey> {
pub struct Collector<K: StoreKey, const SIZE_SHIFT: usize = 0> {
total_key_size: usize,
total_value_size: usize,
entries: Vec<CollectorEntry<K>>,
}

impl<K: StoreKey> Collector<K> {
impl<K: StoreKey, const SIZE_SHIFT: usize> Collector<K, SIZE_SHIFT> {
/// Creates a new collector. Note that this allocates the full capacity for the entries.
pub fn new() -> Self {
Self {
total_key_size: 0,
total_value_size: 0,
entries: Vec::with_capacity(MAX_ENTRIES_PER_INITIAL_FILE),
entries: Vec::with_capacity(MAX_ENTRIES_PER_INITIAL_FILE >> SIZE_SHIFT),
}
}

Expand All @@ -32,8 +32,9 @@ impl<K: StoreKey> Collector<K> {

/// Returns true if the collector is full.
pub fn is_full(&self) -> bool {
self.entries.len() >= MAX_ENTRIES_PER_INITIAL_FILE
|| self.total_key_size + self.total_value_size > DATA_THRESHOLD_PER_INITIAL_FILE
self.entries.len() >= MAX_ENTRIES_PER_INITIAL_FILE >> SIZE_SHIFT
|| self.total_key_size + self.total_value_size
> DATA_THRESHOLD_PER_INITIAL_FILE >> SIZE_SHIFT
}

/// Adds a normal key-value pair to the collector.
Expand Down Expand Up @@ -110,9 +111,4 @@ impl<K: StoreKey> Collector<K> {
self.total_value_size = 0;
self.entries.drain(..)
}

/// Returns the number of entries in the collector.
pub fn len(&self) -> usize {
self.entries.len()
}
}
4 changes: 4 additions & 0 deletions turbopack/crates/turbo-persistence/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ pub const DATA_THRESHOLD_PER_INITIAL_FILE: usize = 256 * 1024 * 1024;
/// Finish file when total amount of data exceeds this
pub const DATA_THRESHOLD_PER_COMPACTED_FILE: usize = 256 * 1024 * 1024;

/// Reduction factor (as bit shift) for the size of the thread-local buffer as shift of
/// MAX_ENTRIES_PER_INITIAL_FILE and DATA_THRESHOLD_PER_INITIAL_FILE.
pub const THREAD_LOCAL_SIZE_SHIFT: usize = 7;

/// Maximum RAM bytes for AQMF cache
pub const AQMF_CACHE_SIZE: u64 = 300 * 1024 * 1024;
pub const AQMF_AVG_SIZE: usize = 37399;
Expand Down
220 changes: 115 additions & 105 deletions turbopack/crates/turbo-persistence/src/write_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{
cell::UnsafeCell,
fs::File,
io::Write,
mem::{replace, swap},
mem::{replace, take},
path::PathBuf,
sync::atomic::{AtomicU32, Ordering},
};
Expand All @@ -13,13 +13,18 @@ use lzzzz::lz4::{self, ACC_LEVEL_DEFAULT};
use parking_lot::Mutex;
use rayon::{
iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator},
scope, Scope,
scope,
};
use smallvec::SmallVec;
use thread_local::ThreadLocal;

use crate::{
collector::Collector, collector_entry::CollectorEntry, constants::MAX_MEDIUM_VALUE_SIZE,
key::StoreKey, static_sorted_file_builder::StaticSortedFileBuilder, ValueBuffer,
collector::Collector,
collector_entry::CollectorEntry,
constants::{MAX_MEDIUM_VALUE_SIZE, THREAD_LOCAL_SIZE_SHIFT},
key::StoreKey,
static_sorted_file_builder::StaticSortedFileBuilder,
ValueBuffer,
};

/// The thread local state of a `WriteBatch`. `FAMILIES` should fit within a `u32`.
Expand All @@ -29,10 +34,7 @@ use crate::{
// `min_generic_const_args` feature.
struct ThreadLocalState<K: StoreKey + Send, const FAMILIES: usize> {
/// The collectors for each family.
collectors: [Option<Collector<K>>; FAMILIES],
/// The list of new SST files that have been created.
/// Tuple of (sequence number, file).
new_sst_files: Vec<(u32, File)>,
collectors: [Option<Collector<K, THREAD_LOCAL_SIZE_SHIFT>>; FAMILIES],
/// The list of new blob files that have been created.
/// Tuple of (sequence number, file).
new_blob_files: Vec<(u32, File)>,
Expand All @@ -55,8 +57,15 @@ pub struct WriteBatch<K: StoreKey + Send, const FAMILIES: usize> {
current_sequence_number: AtomicU32,
/// The thread local state.
thread_locals: ThreadLocal<UnsafeCell<ThreadLocalState<K, FAMILIES>>>,
/// Collectors are are current unused, but have memory preallocated.
/// Collectors in use. The thread local collectors flush into these when they are full.
collectors: [Mutex<Collector<K>>; FAMILIES],
/// The list of new SST files that have been created.
/// Tuple of (sequence number, file).
new_sst_files: Mutex<Vec<(u32, File)>>,
/// Collectors that are currently unused, but have memory preallocated.
idle_collectors: Mutex<Vec<Collector<K>>>,
/// Collectors that are currently unused, but have memory preallocated.
idle_thread_local_collectors: Mutex<Vec<Collector<K, THREAD_LOCAL_SIZE_SHIFT>>>,
}

impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {
Expand All @@ -69,7 +78,10 @@ impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {
path,
current_sequence_number: AtomicU32::new(current),
thread_locals: ThreadLocal::new(),
collectors: [(); FAMILIES].map(|_| Mutex::new(Collector::new())),
new_sst_files: Mutex::new(Vec::new()),
idle_collectors: Mutex::new(Vec::new()),
idle_thread_local_collectors: Mutex::new(Vec::new()),
}
}

Expand All @@ -86,7 +98,6 @@ impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {
let cell = self.thread_locals.get_or(|| {
UnsafeCell::new(ThreadLocalState {
collectors: [const { None }; FAMILIES],
new_sst_files: Vec::new(),
new_blob_files: Vec::new(),
})
});
Expand All @@ -95,31 +106,59 @@ impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {
}

/// Returns the collector for a family for the current thread.
fn collector_mut<'l>(
fn thread_local_collector_mut<'l>(
&self,
state: &'l mut ThreadLocalState<K, FAMILIES>,
family: u32,
) -> Result<&'l mut Collector<K>> {
let family_idx = usize_from_u32(family);
debug_assert!(family_idx < FAMILIES);
let collector = state.collectors[family_idx].get_or_insert_with(|| {
self.idle_collectors
) -> Result<&'l mut Collector<K, THREAD_LOCAL_SIZE_SHIFT>> {
debug_assert!(usize_from_u32(family) < FAMILIES);
let collector = state.collectors[usize_from_u32(family)].get_or_insert_with(|| {
self.idle_thread_local_collectors
.lock()
.pop()
.unwrap_or_else(|| Collector::new())
});
if collector.is_full() {
let sst = self.create_sst_file(family, collector.sorted())?;
collector.clear();
state.new_sst_files.push(sst);
self.flush_thread_local_collector(family, collector)?;
}
Ok(collector)
}

fn flush_thread_local_collector(
&self,
family: u32,
collector: &mut Collector<K, THREAD_LOCAL_SIZE_SHIFT>,
) -> Result<()> {
let mut full_collectors = SmallVec::<[_; 2]>::new();
{
let mut global_collector = self.collectors[usize_from_u32(family)].lock();
for entry in collector.drain() {
global_collector.add_entry(entry);
if global_collector.is_full() {
full_collectors.push(replace(
&mut *global_collector,
self.idle_collectors
.lock()
.pop()
.unwrap_or_else(|| Collector::new()),
));
}
}
}
for mut global_collector in full_collectors {
// When the global collector is full, we create a new SST file.
let sst = self.create_sst_file(family, global_collector.sorted())?;
global_collector.clear();
self.new_sst_files.lock().push(sst);
self.idle_collectors.lock().push(global_collector);
}
Ok(())
}

/// Puts a key-value pair into the write batch.
pub fn put(&self, family: u32, key: K, value: ValueBuffer<'_>) -> Result<()> {
let state = self.thread_local_state();
let collector = self.collector_mut(state, family)?;
let collector = self.thread_local_collector_mut(state, family)?;
if value.len() <= MAX_MEDIUM_VALUE_SIZE {
collector.put(key, value);
} else {
Expand All @@ -133,106 +172,77 @@ impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {
/// Puts a delete operation into the write batch.
pub fn delete(&self, family: u32, key: K) -> Result<()> {
let state = self.thread_local_state();
let collector = self.collector_mut(state, family)?;
let collector = self.thread_local_collector_mut(state, family)?;
collector.delete(key);
Ok(())
}

/// Finishes the write batch by returning the new sequence number and the new SST files. This
/// writes all outstanding thread local data to disk.
pub(crate) fn finish(&mut self) -> Result<FinishResult> {
let mut new_sst_files = Vec::new();
let mut new_blob_files = Vec::new();
let mut all_collectors = [(); FAMILIES].map(|_| Vec::new());
for cell in self.thread_locals.iter_mut() {
let state = cell.get_mut();
new_sst_files.append(&mut state.new_sst_files);
new_blob_files.append(&mut state.new_blob_files);
for (family, global_collector) in all_collectors.iter_mut().enumerate() {
if let Some(collector) = state.collectors[family].take() {
if !collector.is_empty() {
global_collector.push(Some(collector));
}
}
}
}
let shared_new_sst_files = Mutex::new(&mut new_sst_files);
let shared_error = Mutex::new(Ok(()));

// First, we flush all thread local collectors to the global collectors.
scope(|scope| {
fn handle_done_collector<'scope, K: StoreKey + Send + Sync, const FAMILIES: usize>(
this: &'scope WriteBatch<K, FAMILIES>,
scope: &Scope<'scope>,
family: u32,
mut collector: Collector<K>,
shared_new_sst_files: &'scope Mutex<&mut Vec<(u32, File)>>,
shared_error: &'scope Mutex<Result<()>>,
) {
scope.spawn(
move |_| match this.create_sst_file(family, collector.sorted()) {
Ok(sst) => {
collector.clear();
this.idle_collectors.lock().push(collector);
shared_new_sst_files.lock().push(sst);
let mut collectors = [const { Vec::new() }; FAMILIES];
for cell in self.thread_locals.iter_mut() {
let state = cell.get_mut();
new_blob_files.append(&mut state.new_blob_files);
for (family, thread_local_collector) in state.collectors.iter_mut().enumerate() {
if let Some(collector) = thread_local_collector.take() {
if !collector.is_empty() {
collectors[family].push(collector);
}
Err(err) => {
}
}
}
for (family, thread_local_collectors) in collectors.into_iter().enumerate() {
for mut collector in thread_local_collectors {
let this = &self;
let shared_error = &shared_error;
scope.spawn(move |_| {
if let Err(err) =
this.flush_thread_local_collector(family as u32, &mut collector)
{
*shared_error.lock() = Err(err);
}
},
);
this.idle_thread_local_collectors.lock().push(collector);
});
}
}

all_collectors
.into_par_iter()
.enumerate()
.for_each(|(family_idx, collectors)| {
let family = u32::try_from(family_idx).unwrap();
let final_collector = collectors.into_par_iter().reduce(
|| None,
|a, b| match (a, b) {
(Some(mut a), Some(mut b)) => {
if a.len() < b.len() {
swap(&mut a, &mut b);
}
for entry in b.drain() {
if a.is_full() {
let full_collector = replace(
&mut a,
self.idle_collectors
.lock()
.pop()
.unwrap_or_else(|| Collector::new()),
);
handle_done_collector(
self,
scope,
family,
full_collector,
&shared_new_sst_files,
&shared_error,
);
}
a.add_entry(entry);
}
self.idle_collectors.lock().push(b);
Some(a)
}
(Some(a), None) => Some(a),
(None, Some(b)) => Some(b),
(None, None) => None,
},
);
if let Some(collector) = final_collector {
handle_done_collector(
self,
scope,
family,
collector,
&shared_new_sst_files,
&shared_error,
);
}
});
});

// Now we reduce the global collectors in parallel
let mut new_sst_files = take(self.new_sst_files.get_mut());
let shared_new_sst_files = Mutex::new(&mut new_sst_files);

let collectors = replace(
&mut self.collectors,
[(); FAMILIES].map(|_| {
Mutex::new(
self.idle_collectors
.lock()
.pop()
.unwrap_or_else(|| Collector::new()),
)
}),
);
collectors
.into_par_iter()
.enumerate()
.try_for_each(|(family, collector)| {
let family = family as u32;
let mut collector = collector.into_inner();
if !collector.is_empty() {
let sst = self.create_sst_file(family, collector.sorted())?;
collector.clear();
self.idle_collectors.lock().push(collector);
shared_new_sst_files.lock().push(sst);
}
anyhow::Ok(())
})?;

shared_error.into_inner()?;
let seq = self.current_sequence_number.load(Ordering::SeqCst);
new_sst_files.sort_by_key(|(seq, _)| *seq);
Expand Down
Loading