From 08ff39b80555da1fa253973df00ab882996c5b18 Mon Sep 17 00:00:00 2001 From: jobala Date: Wed, 29 Oct 2025 08:40:28 +0300 Subject: [PATCH 1/2] move flushing logic to compaction module --- Cargo.lock | 63 +++++++++++++++++++++++++ storage/Cargo.toml | 2 + storage/src/block/builder.rs | 2 +- storage/src/compaction/flush.rs | 54 +++++++++++++++++++++ storage/src/compaction/mod.rs | 1 + storage/src/lib.rs | 1 + storage/src/lsm_storage.rs | 84 +++++++++++++-------------------- storage/src/sst/builder.rs | 2 +- 8 files changed, 155 insertions(+), 54 deletions(-) create mode 100644 storage/src/compaction/flush.rs create mode 100644 storage/src/compaction/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 93cef1e..c6c692b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -46,9 +46,11 @@ version = "0.1.2" dependencies = [ "anyhow", "bytes", + "crossbeam", "crossbeam-skiplist", "moka", "ouroboros", + "serde", "tempfile", ] @@ -62,6 +64,19 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801" +[[package]] +name = "crossbeam" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1137cd7e7fc0fb5d3c5a8678be38ec56e819125d8d7907411fe24ccb943faca8" +dependencies = [ + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-epoch", + "crossbeam-queue", + "crossbeam-utils", +] + [[package]] name = "crossbeam-channel" version = "0.5.15" @@ -71,6 +86,16 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-deque" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9dd111b7b7f7d55b72c0a6ae361660ee5853c9af73f70c3c2ef6858b950e2e51" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + [[package]] name = "crossbeam-epoch" version = "0.9.18" @@ -80,6 +105,15 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-queue" +version = "0.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f58bbc28f91df819d0aa2a2c00cd19754769c2fad90579b3592b1c9ba7a3115" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-skiplist" version = "0.1.3" @@ -336,6 +370,35 @@ version = "1.0.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d767eb0aabc880b29956c35734170f26ed551a859dbd361d140cdbeca61ab1e2" +[[package]] +name = "serde" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e" +dependencies = [ + "serde_core", +] + +[[package]] +name = "serde_core" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "smallvec" version = "1.15.1" diff --git a/storage/Cargo.toml b/storage/Cargo.toml index 5241c18..5f57b96 100644 --- a/storage/Cargo.toml +++ b/storage/Cargo.toml @@ -8,9 +8,11 @@ description = "LSMTree based storage engine" [dependencies] anyhow = "1.0.99" bytes = "1.10.1" +crossbeam = "0.8.4" crossbeam-skiplist = "0.1.3" moka = { version = "0.12.11", features = ["sync"] } ouroboros = "0.18.5" +serde = "1.0.228" [dev-dependencies] tempfile = "3.23.0" diff --git a/storage/src/block/builder.rs b/storage/src/block/builder.rs index 79913e5..a14274c 100644 --- a/storage/src/block/builder.rs +++ b/storage/src/block/builder.rs @@ -5,9 +5,9 @@ use crate::block::{Block, SIZEOF_U16}; #[derive(Debug)] pub struct BlockBuilder { pub(crate) data: Vec, + pub(crate) first_key: Vec, offsets: Vec, block_size: usize, - pub(crate) first_key: Vec, } impl BlockBuilder { diff --git a/storage/src/compaction/flush.rs b/storage/src/compaction/flush.rs new file mode 100644 index 0000000..a74cc67 --- /dev/null +++ b/storage/src/compaction/flush.rs @@ -0,0 +1,54 @@ +use anyhow::Result; +use std::sync::Arc; + +use crate::{SSTableBuilder, Storage, lsm_storage::StorageState}; + +impl Storage { + pub(crate) fn flush_frozen_memtable(&self) -> Result<()> { + let mut sst_builder = SSTableBuilder::new(self.config.block_size); + let mut guard = self.state.write().unwrap(); + + let mut memtables = guard.frozen_memtables.clone(); + let mut l0_sstables = guard.l0_sstables.clone(); + let mut sstables = guard.sstables.clone(); + + let Some(memtable) = memtables.pop() else { + return Ok(()); + }; + memtable.flush(&mut sst_builder)?; + + let sst = sst_builder.build( + memtable.id, + self.block_cache.clone(), + self.sst_path(memtable.id), + )?; + l0_sstables.push(memtable.id); + sstables.insert(memtable.id, Arc::new(sst)); + + *guard = Arc::new(StorageState { + memtable: guard.memtable.clone(), + frozen_memtables: memtables, + l0_sstables, + sstables, + }); + + Ok(()) + } + + fn trigger_flush(&self) -> Result<()> { + let memtable_count = { + let guard = self.state.read().unwrap(); + guard.frozen_memtables.len() + }; + + if self.config.num_memtable_limit > memtable_count { + self.flush_frozen_memtable()?; + } + + Ok(()) + } + + fn sst_path(&self, id: usize) -> String { + format!("{}/sst/{}.sst", self.config.db_dir, id) + } +} diff --git a/storage/src/compaction/mod.rs b/storage/src/compaction/mod.rs new file mode 100644 index 0000000..73fe11f --- /dev/null +++ b/storage/src/compaction/mod.rs @@ -0,0 +1 @@ +pub mod flush; diff --git a/storage/src/lib.rs b/storage/src/lib.rs index 7a51736..e4152d4 100644 --- a/storage/src/lib.rs +++ b/storage/src/lib.rs @@ -1,4 +1,5 @@ pub mod common; +pub mod compaction; pub mod iterators; pub mod lsm_storage; mod lsm_util; diff --git a/storage/src/lsm_storage.rs b/storage/src/lsm_storage.rs index 103da63..94459fa 100644 --- a/storage/src/lsm_storage.rs +++ b/storage/src/lsm_storage.rs @@ -6,10 +6,11 @@ use std::{ Arc, Mutex, MutexGuard, RwLock, atomic::{AtomicUsize, Ordering::SeqCst}, }, + thread, }; use crate::{ - SSTable, SSTableBuilder, SSTableIterator, + SSTable, SSTableIterator, common::{errors::KeyNotFound, iterator::StorageIterator}, iterators::{ lsm_iterator::LsmIterator, merged_iterator::MergedIterator, @@ -24,25 +25,28 @@ use bytes::Bytes; #[derive(Debug)] pub struct Storage { - state: RwLock>, - block_cache: Arc, - state_lock: Mutex<()>, - sst_id: AtomicUsize, - config: Config, + pub(crate) state: RwLock>, + pub(crate) config: Config, + pub(crate) block_cache: Arc, + pub(crate) state_lock: Mutex<()>, + pub(crate) sst_id: AtomicUsize, + // pub(crate) flush_thread: Mutex>>, + // pub(crate) flush_notifier: crossbeam::channel::Sender<()>, } #[derive(Debug)] -struct StorageState { - memtable: Arc, - frozen_memtables: Vec>, - l0_sstables: Vec, - sstables: HashMap>, +pub(crate) struct StorageState { + pub(crate) memtable: Arc, + pub(crate) frozen_memtables: Vec>, + pub(crate) l0_sstables: Vec, + pub(crate) sstables: HashMap>, } #[derive(Debug)] pub struct Config { pub sst_size: usize, pub block_size: usize, + pub num_memtable_limit: usize, pub db_dir: String, } @@ -202,44 +206,9 @@ impl Storage { } } - fn flush_frozen_memtable(&self) -> Result<()> { - let mut sst_builder = SSTableBuilder::new(self.config.block_size); - let mut guard = self.state.write().unwrap(); - - let mut memtables = guard.frozen_memtables.clone(); - let mut l0_sstables = guard.l0_sstables.clone(); - let mut sstables = guard.sstables.clone(); - - let Some(memtable) = memtables.pop() else { - return Ok(()); - }; - memtable.flush(&mut sst_builder)?; - - let sst = sst_builder.build( - memtable.id, - self.block_cache.clone(), - self.sst_path(memtable.id), - )?; - l0_sstables.push(memtable.id); - sstables.insert(memtable.id, Arc::new(sst)); - - *guard = Arc::new(StorageState { - memtable: guard.memtable.clone(), - frozen_memtables: memtables, - l0_sstables, - sstables, - }); - - Ok(()) - } - fn inc_sst_id(&self) -> usize { self.sst_id.fetch_add(1, SeqCst) } - - fn sst_path(&self, id: usize) -> String { - format!("{}/sst/{}.sst", self.config.db_dir, id) - } } #[cfg(test)] @@ -248,7 +217,7 @@ mod tests { use tempfile::tempdir; - use crate::lsm_util::{self, get_entries}; + use crate::lsm_util::get_entries; use super::*; @@ -402,11 +371,6 @@ mod tests { let mut values = vec![]; while iter.is_valid() { - println!( - "key: {:?}, value: {:?}", - from_utf8(iter.key()).unwrap(), - from_utf8(iter.value()).unwrap() - ); let k = from_utf8(iter.key()).unwrap(); let v = from_utf8(iter.value()).unwrap(); @@ -416,6 +380,22 @@ mod tests { let _ = iter.next(); } + // expect the first frozen memtable to have id 3 + // we already have sstables with id 0, 1 & 2 + // we use .last here because frozen memtables are stored newest to oldest + // with oldest being the first memtable to be frozen + assert_eq!( + 3, + storage + .state + .read() + .unwrap() + .frozen_memtables + .last() + .unwrap() + .id + ); + assert_eq!(keys, vec!["a", "b", "c", "d", "e", "f"]); assert_eq!(values, vec!["20", "23", "3", "22", "21", "6"]); } diff --git a/storage/src/sst/builder.rs b/storage/src/sst/builder.rs index 8971b05..ed6767f 100644 --- a/storage/src/sst/builder.rs +++ b/storage/src/sst/builder.rs @@ -12,11 +12,11 @@ use crate::sst::table::{BlockCache, SSTable}; #[derive(Debug)] pub struct SSTableBuilder { + pub(crate) meta: Vec, builder: BlockBuilder, first_key: Vec, last_key: Vec, data: Vec, - pub(crate) meta: Vec, block_size: usize, } From d1a69282480ccfcba1ddf223df00d555749600b0 Mon Sep 17 00:00:00 2001 From: jobala Date: Wed, 29 Oct 2025 09:57:34 +0300 Subject: [PATCH 2/2] fix errors --- storage/src/compaction/flush.rs | 18 +++++++++++++++++- storage/src/lsm_storage.rs | 19 ++++++++++++------- storage/tests/storage.rs | 4 ++++ 3 files changed, 33 insertions(+), 8 deletions(-) diff --git a/storage/src/compaction/flush.rs b/storage/src/compaction/flush.rs index a74cc67..79ed1ad 100644 --- a/storage/src/compaction/flush.rs +++ b/storage/src/compaction/flush.rs @@ -1,5 +1,9 @@ use anyhow::Result; -use std::sync::Arc; +use std::{ + sync::Arc, + thread::{self, JoinHandle}, + time::Duration, +}; use crate::{SSTableBuilder, Storage, lsm_storage::StorageState}; @@ -52,3 +56,15 @@ impl Storage { format!("{}/sst/{}.sst", self.config.db_dir, id) } } + +// TODO: suppot msg passing +pub fn spawn_flusher(storage: Arc) -> JoinHandle<()> { + let this = storage.clone(); + + thread::spawn(move || { + loop { + this.trigger_flush().expect("memtable to have been flushed"); + thread::sleep(Duration::from_millis(50)); + } + }) +} diff --git a/storage/src/lsm_storage.rs b/storage/src/lsm_storage.rs index 94459fa..9bfb275 100644 --- a/storage/src/lsm_storage.rs +++ b/storage/src/lsm_storage.rs @@ -6,7 +6,6 @@ use std::{ Arc, Mutex, MutexGuard, RwLock, atomic::{AtomicUsize, Ordering::SeqCst}, }, - thread, }; use crate::{ @@ -30,8 +29,6 @@ pub struct Storage { pub(crate) block_cache: Arc, pub(crate) state_lock: Mutex<()>, pub(crate) sst_id: AtomicUsize, - // pub(crate) flush_thread: Mutex>>, - // pub(crate) flush_notifier: crossbeam::channel::Sender<()>, } #[derive(Debug)] @@ -50,10 +47,10 @@ pub struct Config { pub db_dir: String, } -pub fn new(config: Config) -> Storage { +pub fn new(config: Config) -> Arc { let db_dir = Path::new(&config.db_dir); create_db_dir(db_dir); - let block_cache = Arc::new(BlockCache::new(1 << 20)); // 4gb + let block_cache = Arc::new(BlockCache::new(4096)); let (l0_sstables, sstables) = load_sstables(db_dir, block_cache).expect("loaded sstables"); let sst_id = match l0_sstables.iter().max() { @@ -61,7 +58,7 @@ pub fn new(config: Config) -> Storage { None => 0, }; - Storage { + Arc::new(Storage { config, sst_id: AtomicUsize::new(sst_id), state_lock: Mutex::new(()), @@ -72,7 +69,7 @@ pub fn new(config: Config) -> Storage { l0_sstables, sstables, })), - } + }) } impl Storage { @@ -227,6 +224,7 @@ mod tests { sst_size: 4, block_size: 32, db_dir: String::from(tempdir().unwrap().path().to_str().unwrap()), + num_memtable_limit: 5, }; let storage = new(config); @@ -245,6 +243,7 @@ mod tests { sst_size: 4, block_size: 32, db_dir: String::from(tempdir().unwrap().path().to_str().unwrap()), + num_memtable_limit: 5, }; let storage = new(config); @@ -269,6 +268,7 @@ mod tests { sst_size: 4, block_size: 32, db_dir: db_dir.clone(), + num_memtable_limit: 5, }; let storage = new(config); @@ -285,6 +285,7 @@ mod tests { sst_size: 4, block_size: 32, db_dir: db_dir.clone(), + num_memtable_limit: 5, }; let storage = new(config); @@ -299,6 +300,7 @@ mod tests { sst_size: 4, block_size: 32, db_dir: db_dir.clone(), + num_memtable_limit: 5, }; let storage = new(config); @@ -314,6 +316,7 @@ mod tests { sst_size: 4, block_size: 32, db_dir: db_dir.clone(), + num_memtable_limit: 5, }; let storage = new(config); @@ -335,6 +338,7 @@ mod tests { sst_size: 4, block_size: 32, db_dir: db_dir.clone(), + num_memtable_limit: 5, }; let storage = new(config); @@ -358,6 +362,7 @@ mod tests { sst_size: 4, block_size: 32, db_dir: db_dir.clone(), + num_memtable_limit: 5, }; let new_entries = vec![(b"a", b"20"), (b"e", b"21"), (b"d", b"22"), (b"b", b"23")]; diff --git a/storage/tests/storage.rs b/storage/tests/storage.rs index 4e5c25a..48cd531 100644 --- a/storage/tests/storage.rs +++ b/storage/tests/storage.rs @@ -13,6 +13,7 @@ fn get_returns_latest_entry() { sst_size: 2, block_size: 32, db_dir: get_temp_dir(), + num_memtable_limit: 5, }; let storage = cabin_storage::new(config); let entries = vec![ @@ -36,6 +37,7 @@ fn can_read_frozen_memtable() { sst_size: 2, block_size: 32, db_dir: get_temp_dir(), + num_memtable_limit: 5, }; let storage = cabin_storage::new(config); let entries = vec![(b"1", b"20"), (b"2", b"21"), (b"3", b"22"), (b"4", b"23")]; @@ -55,6 +57,7 @@ fn get_invalid_key() { sst_size: 2, block_size: 32, db_dir: String::from(tempdir().unwrap().path().to_str().unwrap()), + num_memtable_limit: 5, }; let storage = cabin_storage::new(config); @@ -67,6 +70,7 @@ fn scan_items() { sst_size: 10, block_size: 32, db_dir: get_temp_dir(), + num_memtable_limit: 5, }; let storage = cabin_storage::new(config); let entries = vec![