diff --git a/Cargo.lock b/Cargo.lock index 23f125873ee98..8a970397ac972 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9334,6 +9334,7 @@ dependencies = [ "smallvec", "tempfile", "thread_local", + "tracing", "twox-hash 2.1.0", "zstd", ] diff --git a/turbopack/crates/turbo-persistence/Cargo.toml b/turbopack/crates/turbo-persistence/Cargo.toml index abd656d3fcb87..c0ae5159467a8 100644 --- a/turbopack/crates/turbo-persistence/Cargo.toml +++ b/turbopack/crates/turbo-persistence/Cargo.toml @@ -25,6 +25,7 @@ rustc-hash = { workspace = true } serde = { workspace = true } smallvec = { workspace = true} thread_local = { workspace = true } +tracing = { workspace = true } twox-hash = { version = "2.0.1", features = ["xxhash64"] } zstd = { version = "0.13.2", features = ["zdict_builder"] } diff --git a/turbopack/crates/turbo-persistence/src/lib.rs b/turbopack/crates/turbo-persistence/src/lib.rs index fd4473e6102f8..a1a75ca772647 100644 --- a/turbopack/crates/turbo-persistence/src/lib.rs +++ b/turbopack/crates/turbo-persistence/src/lib.rs @@ -1,6 +1,7 @@ #![feature(once_cell_try)] #![feature(new_zeroed_alloc)] #![feature(get_mut_unchecked)] +#![feature(sync_unsafe_cell)] mod arc_slice; mod collector; diff --git a/turbopack/crates/turbo-persistence/src/tests.rs b/turbopack/crates/turbo-persistence/src/tests.rs index f74adf6e95ffd..08d2bbc1b878f 100644 --- a/turbopack/crates/turbo-persistence/src/tests.rs +++ b/turbopack/crates/turbo-persistence/src/tests.rs @@ -48,6 +48,28 @@ fn full_cycle() -> Result<()> { }, ); + test_case( + &mut test_cases, + "Many SST files", + |batch| { + for i in 10..100u8 { + batch.put(0, vec![i], vec![i].into())?; + unsafe { batch.flush(0)? }; + } + Ok(()) + }, + |db| { + let Some(value) = db.get(0, &[42u8])? else { + panic!("Value not found"); + }; + assert_eq!(&*value, &[42]); + assert_eq!(db.get(0, &[42u8, 42])?, None); + assert_eq!(db.get(0, &[1u8])?, None); + assert_eq!(db.get(0, &[255u8])?, None); + Ok(()) + }, + ); + test_case( &mut test_cases, "Families", diff --git a/turbopack/crates/turbo-persistence/src/write_batch.rs b/turbopack/crates/turbo-persistence/src/write_batch.rs index 7039280570889..2f234d2eac5b7 100644 --- a/turbopack/crates/turbo-persistence/src/write_batch.rs +++ b/turbopack/crates/turbo-persistence/src/write_batch.rs @@ -1,5 +1,5 @@ use std::{ - cell::UnsafeCell, + cell::SyncUnsafeCell, fs::File, io::Write, mem::{replace, take}, @@ -17,6 +17,7 @@ use rayon::{ }; use smallvec::SmallVec; use thread_local::ThreadLocal; +use tracing::Span; use crate::{ collector::Collector, @@ -68,7 +69,7 @@ pub struct WriteBatch { /// The current sequence number counter. Increased for every new SST file or blob file. current_sequence_number: AtomicU32, /// The thread local state. - thread_locals: ThreadLocal>>, + thread_locals: ThreadLocal>>, /// Collectors in use. The thread local collectors flush into these when they are full. collectors: [Mutex>; FAMILIES], /// The list of new SST files that have been created. @@ -109,7 +110,7 @@ impl WriteBatch { #[allow(clippy::mut_from_ref)] fn thread_local_state(&self) -> &mut ThreadLocalState { let cell = self.thread_locals.get_or(|| { - UnsafeCell::new(ThreadLocalState { + SyncUnsafeCell::new(ThreadLocalState { collectors: [const { None }; FAMILIES], new_blob_files: Vec::new(), }) @@ -185,7 +186,7 @@ impl WriteBatch { let sst = self.create_sst_file(family, global_collector.sorted())?; global_collector.clear(); self.new_sst_files.lock().push(sst); - self.idle_collectors.lock().push(global_collector); + self.dispose_collector(global_collector); } Ok(()) } @@ -197,6 +198,14 @@ impl WriteBatch { .unwrap_or_else(|| Collector::new()) } + fn dispose_collector(&self, collector: Collector) { + self.idle_collectors.lock().push(collector); + } + + fn dispose_thread_local_collector(&self, collector: Collector) { + self.idle_thread_local_collectors.lock().push(collector); + } + /// Puts a key-value pair into the write batch. pub fn put(&self, family: u32, key: K, value: ValueBuffer<'_>) -> Result<()> { let state = self.thread_local_state(); @@ -219,6 +228,66 @@ impl WriteBatch { Ok(()) } + /// Flushes a family of the write batch, reducing the amount of buffered memory used. + /// Does not commit any data persistently. + /// + /// # Safety + /// + /// Caller must ensure that no concurrent put or delete operation is happening on the flushed + /// family. + pub unsafe fn flush(&self, family: u32) -> Result<()> { + // Flush the thread local collectors to the global collector. + let mut collectors = Vec::new(); + for cell in self.thread_locals.iter() { + let state = unsafe { &mut *cell.get() }; + if let Some(collector) = state.collectors[usize_from_u32(family)].take() { + if !collector.is_empty() { + collectors.push(collector); + } + } + } + + let span = Span::current(); + collectors.into_par_iter().try_for_each(|mut collector| { + let _span = span.clone().entered(); + self.flush_thread_local_collector(family, &mut collector)?; + self.dispose_thread_local_collector(collector); + anyhow::Ok(()) + })?; + + // Now we flush the global collector(s). + let mut collector_state = self.collectors[usize_from_u32(family)].lock(); + match &mut *collector_state { + GlobalCollectorState::Unsharded(collector) => { + if !collector.is_empty() { + let sst = self.create_sst_file(family, collector.sorted())?; + collector.clear(); + self.new_sst_files.lock().push(sst); + } + } + GlobalCollectorState::Sharded(_) => { + let GlobalCollectorState::Sharded(shards) = replace( + &mut *collector_state, + GlobalCollectorState::Unsharded(self.get_new_collector()), + ) else { + unreachable!(); + }; + shards.into_par_iter().try_for_each(|mut collector| { + let _span = span.clone().entered(); + if !collector.is_empty() { + let sst = self.create_sst_file(family, collector.sorted())?; + collector.clear(); + self.new_sst_files.lock().push(sst); + self.dispose_collector(collector); + } + anyhow::Ok(()) + })?; + } + } + + Ok(()) + } + /// Finishes the write batch by returning the new sequence number and the new SST files. This /// writes all outstanding thread local data to disk. pub(crate) fn finish(&mut self) -> Result { @@ -249,7 +318,7 @@ impl WriteBatch { { *shared_error.lock() = Err(err); } - this.idle_thread_local_collectors.lock().push(collector); + this.dispose_thread_local_collector(collector); }); } } @@ -283,7 +352,7 @@ impl WriteBatch { if !collector.is_empty() { let sst = self.create_sst_file(family, collector.sorted())?; collector.clear(); - self.idle_collectors.lock().push(collector); + self.dispose_collector(collector); shared_new_sst_files.lock().push(sst); } anyhow::Ok(()) diff --git a/turbopack/crates/turbo-tasks-backend/src/database/noop_kv.rs b/turbopack/crates/turbo-tasks-backend/src/database/noop_kv.rs index 050140198e59c..fbbd310efbb71 100644 --- a/turbopack/crates/turbo-tasks-backend/src/database/noop_kv.rs +++ b/turbopack/crates/turbo-tasks-backend/src/database/noop_kv.rs @@ -90,6 +90,10 @@ impl SerialWriteBatch<'_> for NoopWriteBatch { fn delete(&mut self, _key_space: KeySpace, _key: WriteBuffer<'_>) -> Result<()> { Ok(()) } + + fn flush(&mut self, _key_space: KeySpace) -> Result<()> { + Ok(()) + } } impl ConcurrentWriteBatch<'_> for NoopWriteBatch { @@ -105,4 +109,8 @@ impl ConcurrentWriteBatch<'_> for NoopWriteBatch { fn delete(&self, _key_space: KeySpace, _key: WriteBuffer<'_>) -> Result<()> { Ok(()) } + + unsafe fn flush(&self, _key_space: KeySpace) -> Result<()> { + Ok(()) + } } diff --git a/turbopack/crates/turbo-tasks-backend/src/database/turbo.rs b/turbopack/crates/turbo-tasks-backend/src/database/turbo.rs index 5a90e3e4bf663..1df9d269abf87 100644 --- a/turbopack/crates/turbo-tasks-backend/src/database/turbo.rs +++ b/turbopack/crates/turbo-tasks-backend/src/database/turbo.rs @@ -160,6 +160,10 @@ impl<'a> ConcurrentWriteBatch<'a> for TurboWriteBatch<'a> { fn delete(&self, key_space: KeySpace, key: WriteBuffer<'_>) -> Result<()> { self.batch.delete(key_space as u32, key.into_static()) } + + unsafe fn flush(&self, key_space: KeySpace) -> Result<()> { + self.batch.flush(key_space as u32) + } } impl KeyBase for WriteBuffer<'_> { diff --git a/turbopack/crates/turbo-tasks-backend/src/database/write_batch.rs b/turbopack/crates/turbo-tasks-backend/src/database/write_batch.rs index 6754a4ebb69a3..a69f47e8746b0 100644 --- a/turbopack/crates/turbo-tasks-backend/src/database/write_batch.rs +++ b/turbopack/crates/turbo-tasks-backend/src/database/write_batch.rs @@ -66,11 +66,18 @@ pub trait SerialWriteBatch<'a>: BaseWriteBatch<'a> { value: WriteBuffer<'_>, ) -> Result<()>; fn delete(&mut self, key_space: KeySpace, key: WriteBuffer<'_>) -> Result<()>; + fn flush(&mut self, key_space: KeySpace) -> Result<()>; } pub trait ConcurrentWriteBatch<'a>: BaseWriteBatch<'a> + Sync + Send { fn put(&self, key_space: KeySpace, key: WriteBuffer<'_>, value: WriteBuffer<'_>) -> Result<()>; fn delete(&self, key_space: KeySpace, key: WriteBuffer<'_>) -> Result<()>; + /// Flushes a key space of the write batch, reducing the amount of buffered memory used. + /// Does not commit any data persistently. + /// + /// Safety: Caller must ensure that no concurrent put or delete operation is happening on the + /// flushed key space. + unsafe fn flush(&self, key_space: KeySpace) -> Result<()>; } pub enum WriteBatch<'a, S, C> @@ -164,6 +171,16 @@ where WriteBatch::Concurrent(c, _) => c.delete(key_space, key), } } + + fn flush(&mut self, key_space: KeySpace) -> Result<()> { + match self { + WriteBatch::Serial(s) => s.flush(key_space), + WriteBatch::Concurrent(c, _) => { + // Safety: the &mut self ensures that no concurrent operation is happening + unsafe { c.flush(key_space) } + } + } + } } pub enum WriteBatchRef<'r, 'a, S, C> @@ -241,6 +258,16 @@ where WriteBatchRef::Concurrent(c, _) => c.delete(key_space, key), } } + + fn flush(&mut self, key_space: KeySpace) -> Result<()> { + match self { + WriteBatchRef::Serial(s) => s.flush(key_space), + WriteBatchRef::Concurrent(c, _) => { + // Safety: the &mut self ensures that no concurrent operation is happening + unsafe { c.flush(key_space) } + } + } + } } pub struct UnimplementedWriteBatch; @@ -275,6 +302,9 @@ impl SerialWriteBatch<'_> for UnimplementedWriteBatch { fn delete(&mut self, _key_space: KeySpace, _key: WriteBuffer<'_>) -> Result<()> { todo!() } + fn flush(&mut self, _key_space: KeySpace) -> Result<()> { + todo!() + } } impl ConcurrentWriteBatch<'_> for UnimplementedWriteBatch { @@ -289,4 +319,7 @@ impl ConcurrentWriteBatch<'_> for UnimplementedWriteBatch { fn delete(&self, _key_space: KeySpace, _key: WriteBuffer<'_>) -> Result<()> { todo!() } + unsafe fn flush(&self, _key_space: KeySpace) -> Result<()> { + todo!() + } } diff --git a/turbopack/crates/turbo-tasks-backend/src/kv_backing_storage.rs b/turbopack/crates/turbo-tasks-backend/src/kv_backing_storage.rs index 76086f8a004bb..90c3b5c0334be 100644 --- a/turbopack/crates/turbo-tasks-backend/src/kv_backing_storage.rs +++ b/turbopack/crates/turbo-tasks-backend/src/kv_backing_storage.rs @@ -184,6 +184,13 @@ impl BackingStorage { let _span = tracing::trace_span!("update task data").entered(); process_task_data(snapshots, Some(batch))?; + [KeySpace::TaskMeta, KeySpace::TaskData] + .into_par_iter() + .try_for_each(|key_space| { + // Safety: We already finished all processing of the task data and task + // meta + unsafe { batch.flush(key_space) } + })?; } let mut next_task_id = get_next_free_task_id::< @@ -500,6 +507,7 @@ where ) .with_context(|| anyhow!("Unable to write operations"))?; } + batch.flush(KeySpace::Infra)?; Ok(()) }