Skip to content

Turbopack: initially shard SST files #78652

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 30, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 72 additions & 26 deletions turbopack/crates/turbo-persistence/src/write_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use byteorder::{WriteBytesExt, BE};
use lzzzz::lz4::{self, ACC_LEVEL_DEFAULT};
use parking_lot::Mutex;
use rayon::{
iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator},
iter::{Either, IndexedParallelIterator, IntoParallelIterator, ParallelIterator},
scope,
};
use smallvec::SmallVec;
Expand Down Expand Up @@ -40,6 +40,10 @@ struct ThreadLocalState<K: StoreKey + Send, const FAMILIES: usize> {
new_blob_files: Vec<(u32, File)>,
}

const COLLECTOR_SHARDS: usize = 4;
const COLLECTOR_SHARD_SHIFT: usize =
u64::BITS as usize - COLLECTOR_SHARDS.trailing_zeros() as usize;

/// The result of a `WriteBatch::finish` operation.
pub(crate) struct FinishResult {
pub(crate) sequence_number: u32,
Expand All @@ -49,6 +53,14 @@ pub(crate) struct FinishResult {
pub(crate) new_blob_files: Vec<(u32, File)>,
}

enum GlobalCollectorState<K: StoreKey + Send> {
/// Initial state. Single collector. Once the collector is full, we switch to sharded mode.
Unsharded(Collector<K>),
/// Sharded mode.
/// We use multiple collectors, and select one based on the first bits of the key hash.
Sharded([Collector<K>; COLLECTOR_SHARDS]),
}

/// A write batch.
pub struct WriteBatch<K: StoreKey + Send, const FAMILIES: usize> {
/// The database path
Expand All @@ -58,7 +70,7 @@ pub struct WriteBatch<K: StoreKey + Send, const FAMILIES: usize> {
/// The thread local state.
thread_locals: ThreadLocal<UnsafeCell<ThreadLocalState<K, FAMILIES>>>,
/// Collectors in use. The thread local collectors flush into these when they are full.
collectors: [Mutex<Collector<K>>; FAMILIES],
collectors: [Mutex<GlobalCollectorState<K>>; FAMILIES],
/// The list of new SST files that have been created.
/// Tuple of (sequence number, file).
new_sst_files: Mutex<Vec<(u32, File)>>,
Expand All @@ -78,7 +90,8 @@ impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {
path,
current_sequence_number: AtomicU32::new(current),
thread_locals: ThreadLocal::new(),
collectors: [(); FAMILIES].map(|_| Mutex::new(Collector::new())),
collectors: [(); FAMILIES]
.map(|_| Mutex::new(GlobalCollectorState::Unsharded(Collector::new()))),
new_sst_files: Mutex::new(Vec::new()),
idle_collectors: Mutex::new(Vec::new()),
idle_thread_local_collectors: Mutex::new(Vec::new()),
Expand Down Expand Up @@ -131,17 +144,39 @@ impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {
) -> Result<()> {
let mut full_collectors = SmallVec::<[_; 2]>::new();
{
let mut global_collector = self.collectors[usize_from_u32(family)].lock();
let mut global_collector_state = self.collectors[usize_from_u32(family)].lock();
for entry in collector.drain() {
global_collector.add_entry(entry);
if global_collector.is_full() {
full_collectors.push(replace(
&mut *global_collector,
self.idle_collectors
.lock()
.pop()
.unwrap_or_else(|| Collector::new()),
));
match &mut *global_collector_state {
GlobalCollectorState::Unsharded(collector) => {
collector.add_entry(entry);
if collector.is_full() {
// When full, split the entries into shards.
let mut shards: [Collector<K>; 4] =
[(); COLLECTOR_SHARDS].map(|_| Collector::new());
for entry in collector.drain() {
let shard = (entry.key.hash >> COLLECTOR_SHARD_SHIFT) as usize;
shards[shard].add_entry(entry);
}
// There is a rare edge case where all entries are in the same shard,
// and the collector is full after the split.
for collector in shards.iter_mut() {
if collector.is_full() {
full_collectors
.push(replace(&mut *collector, self.get_new_collector()));
}
}
*global_collector_state = GlobalCollectorState::Sharded(shards);
}
}
GlobalCollectorState::Sharded(shards) => {
let shard = (entry.key.hash >> COLLECTOR_SHARD_SHIFT) as usize;
let collector = &mut shards[shard];
collector.add_entry(entry);
if collector.is_full() {
full_collectors
.push(replace(&mut *collector, self.get_new_collector()));
}
}
}
}
}
Expand All @@ -155,6 +190,13 @@ impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {
Ok(())
}

fn get_new_collector(&self) -> Collector<K> {
self.idle_collectors
.lock()
.pop()
.unwrap_or_else(|| Collector::new())
}

/// Puts a key-value pair into the write batch.
pub fn put(&self, family: u32, key: K, value: ValueBuffer<'_>) -> Result<()> {
let state = self.thread_local_state();
Expand Down Expand Up @@ -217,23 +259,27 @@ impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {
let mut new_sst_files = take(self.new_sst_files.get_mut());
let shared_new_sst_files = Mutex::new(&mut new_sst_files);

let collectors = replace(
&mut self.collectors,
[(); FAMILIES].map(|_| {
Mutex::new(
self.idle_collectors
.lock()
.pop()
.unwrap_or_else(|| Collector::new()),
)
}),
);
let new_collectors = [(); FAMILIES]
.map(|_| Mutex::new(GlobalCollectorState::Unsharded(self.get_new_collector())));
let collectors = replace(&mut self.collectors, new_collectors);
collectors
.into_par_iter()
.enumerate()
.try_for_each(|(family, collector)| {
.flat_map(|(family, state)| {
let collector = state.into_inner();
match collector {
GlobalCollectorState::Unsharded(collector) => {
Either::Left([(family, collector)].into_par_iter())
}
GlobalCollectorState::Sharded(shards) => Either::Right(
shards
.into_par_iter()
.map(move |collector| (family, collector)),
),
}
})
.try_for_each(|(family, mut collector)| {
let family = family as u32;
let mut collector = collector.into_inner();
if !collector.is_empty() {
let sst = self.create_sst_file(family, collector.sorted())?;
collector.clear();
Expand Down
Loading