From 93840ae0bf2ea81317b7e833a4d1d0d2e47a09d5 Mon Sep 17 00:00:00 2001 From: jobala Date: Thu, 6 Nov 2025 18:29:09 +0300 Subject: [PATCH 1/3] fix failing tests --- storage/src/lsm_storage.rs | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/storage/src/lsm_storage.rs b/storage/src/lsm_storage.rs index 7bf2413..8ff15c9 100644 --- a/storage/src/lsm_storage.rs +++ b/storage/src/lsm_storage.rs @@ -436,7 +436,7 @@ mod tests { block_size: 32, db_dir: db_dir.clone(), num_memtable_limit: 5, - enable_wal: true, + enable_wal: false, }; let storage = new(config); @@ -540,8 +540,16 @@ mod tests { .id ); - assert_eq!(keys, vec!["a", "b", "c", "d", "e", "f"]); - assert_eq!(values, vec!["20", "23", "3", "22", "21", "6"]); + assert_eq!( + keys, + vec!["a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l"] + ); + assert_eq!( + values, + vec![ + "20", "23", "3", "22", "21", "6", "7", "8", "9", "10", "11", "12" + ] + ); } #[test] @@ -577,7 +585,7 @@ mod tests { block_size: 32, db_dir: db_dir.clone(), num_memtable_limit: 5, - enable_wal: true, + enable_wal: false, }; let new_entries = vec![(b"a", b"20"), (b"e", b"21"), (b"d", b"22"), (b"b", b"23")]; @@ -598,7 +606,6 @@ mod tests { while iter.is_valid() { let k = from_utf8(iter.key()).unwrap(); let v = from_utf8(iter.value()).unwrap(); - println!("key: {:?}, value: {:?}", k, v); keys.push(String::from(k)); values.push(String::from(v)); From 34d5b8a16aa7aeb00fd27559e9163c2c2a7306ce Mon Sep 17 00:00:00 2001 From: jobala Date: Thu, 6 Nov 2025 18:44:35 +0300 Subject: [PATCH 2/3] return result from storage::new --- storage/src/compaction/compact.rs | 8 +++--- storage/src/lsm_storage.rs | 41 +++++++++++++++---------------- storage/tests/storage.rs | 8 +++--- 3 files changed, 28 insertions(+), 29 deletions(-) diff --git a/storage/src/compaction/compact.rs b/storage/src/compaction/compact.rs index 443cab8..9e3fa21 100644 --- a/storage/src/compaction/compact.rs +++ b/storage/src/compaction/compact.rs @@ -113,7 +113,7 @@ mod tests { num_memtable_limit: 5, enable_wal: true, }; - let storage = new(config); + let storage = new(config).unwrap(); for (key, value) in get_entries() { storage.put(key, value).unwrap(); @@ -148,7 +148,7 @@ mod tests { num_memtable_limit: 5, enable_wal: true, }; - let storage = new(config); + let storage = new(config).unwrap(); for (key, value) in get_entries() { storage.put(key, value).unwrap(); @@ -211,7 +211,7 @@ mod tests { num_memtable_limit: 5, enable_wal: true, }; - let storage = new(config); + let storage = new(config).unwrap(); let mut entries = get_entries(); // adds a new version of key a=3 @@ -268,7 +268,7 @@ mod tests { num_memtable_limit: 5, enable_wal: true, }; - let storage = new(config); + let storage = new(config).unwrap(); for (key, value) in get_entries() { storage.put(key, value).unwrap(); diff --git a/storage/src/lsm_storage.rs b/storage/src/lsm_storage.rs index 8ff15c9..c0c29f2 100644 --- a/storage/src/lsm_storage.rs +++ b/storage/src/lsm_storage.rs @@ -51,7 +51,7 @@ pub struct Config { pub enable_wal: bool, } -pub fn new(config: Config) -> Arc { +pub fn new(config: Config) -> Result> { let state_lock = Mutex::new(()); let block_cache = Arc::new(BlockCache::new(4096)); let db_dir = Path::new(&config.db_dir); @@ -66,11 +66,11 @@ pub fn new(config: Config) -> Arc { manifest = man; manifest_records = manifest_recs; } - Err(_) => manifest = Manifest::create(manifest_file).unwrap(), + Err(_) => manifest = Manifest::create(manifest_file)?, } let (memtable_ids, l0_sst_ids, l1_sst_ids, sstables) = - load_sstables(db_dir, block_cache, manifest_records).expect("loaded sstables"); + load_sstables(db_dir, block_cache, manifest_records)?; let sst_id = match ([&memtable_ids[..], &l0_sst_ids[..], &l1_sst_ids[..]].concat()) .iter() .max() @@ -83,7 +83,7 @@ pub fn new(config: Config) -> Arc { true => { let wal_path = db_dir.join(format!("{sst_id}.wal")); if memtable_ids.is_empty() { - let memtable = Memtable::new_with_wal(sst_id, wal_path.as_path()).expect(""); + let memtable = Memtable::new_with_wal(sst_id, wal_path.as_path())?; manifest .add_record( &state_lock.lock().unwrap(), @@ -96,8 +96,7 @@ pub fn new(config: Config) -> Arc { let mut memtables = vec![]; for id in memtable_ids { let wal_path = db_dir.join(format!("{id}.wal")); - let memtable = - Memtable::new_with_wal(id, wal_path.as_path()).expect("created memtable"); + let memtable = Memtable::new_with_wal(id, wal_path.as_path())?; memtables.push(Arc::new(memtable)); } @@ -125,7 +124,7 @@ pub fn new(config: Config) -> Arc { .expect("added manifest record"); } - Arc::new(Storage { + Ok(Arc::new(Storage { config, state_lock, manifest, @@ -138,7 +137,7 @@ pub fn new(config: Config) -> Arc { l0_sstables: l0_sst_ids, levels: vec![(0, l1_sst_ids)], })), - }) + })) } impl Storage { @@ -358,7 +357,7 @@ mod tests { num_memtable_limit: 5, enable_wal: true, }; - let storage = new(config); + let storage = new(config).unwrap(); let input = vec![b"1", b"2", b"3", b"4", b"5"]; for entry in input { @@ -378,7 +377,7 @@ mod tests { num_memtable_limit: 5, enable_wal: true, }; - let storage = new(config); + let storage = new(config).unwrap(); let input = vec![b"1", b"2", b"3", b"4", b"5"]; for entry in input { @@ -404,7 +403,7 @@ mod tests { num_memtable_limit: 5, enable_wal: true, }; - let storage = new(config); + let storage = new(config).unwrap(); let input = vec![b"1", b"2", b"3", b"4", b"5"]; for entry in input { @@ -423,7 +422,7 @@ mod tests { enable_wal: true, }; - let storage = new(config); + let storage = new(config).unwrap(); assert_eq!(1, storage.state.read().unwrap().l0_sstables.len()); assert_eq!(0, storage.state.read().unwrap().l0_sstables[0]); } @@ -438,7 +437,7 @@ mod tests { num_memtable_limit: 5, enable_wal: false, }; - let storage = new(config); + let storage = new(config).unwrap(); for (key, value) in get_entries() { storage.put(key, value).unwrap(); @@ -456,7 +455,7 @@ mod tests { enable_wal: true, }; - let storage = new(config); + let storage = new(config).unwrap(); let mut iter = storage.scan(Bound::Unbounded, Bound::Unbounded).unwrap(); let mut res = vec![]; @@ -478,7 +477,7 @@ mod tests { num_memtable_limit: 5, enable_wal: true, }; - let storage = new(config); + let storage = new(config).unwrap(); for (key, value) in get_entries() { storage.put(key, value).unwrap(); @@ -505,7 +504,7 @@ mod tests { }; let new_entries = vec![(b"a", b"20"), (b"e", b"21"), (b"d", b"22"), (b"b", b"23")]; - let storage = new(config); + let storage = new(config).unwrap(); for (key, value) in new_entries { let _ = storage.put(key, value); } @@ -562,7 +561,7 @@ mod tests { num_memtable_limit: 5, enable_wal: true, }; - let storage = new(config); + let storage = new(config).unwrap(); for (key, value) in get_entries() { storage.put(key, value).unwrap(); @@ -589,7 +588,7 @@ mod tests { }; let new_entries = vec![(b"a", b"20"), (b"e", b"21"), (b"d", b"22"), (b"b", b"23")]; - let storage = new(config); + let storage = new(config).unwrap(); for (key, value) in new_entries { let _ = storage.put(key, value); } @@ -626,7 +625,7 @@ mod tests { num_memtable_limit: 5, enable_wal: true, }; - let storage = new(config); + let storage = new(config).unwrap(); for (key, value) in get_entries() { storage.put(key, value).unwrap(); @@ -685,7 +684,7 @@ mod tests { num_memtable_limit: 5, enable_wal: true, }; - let storage = new(config); + let storage = new(config).unwrap(); for (key, value) in get_entries() { storage.put(key, value).unwrap(); @@ -716,7 +715,7 @@ mod tests { num_memtable_limit: 5, enable_wal: true, }; - let storage = new(config); + let storage = new(config).unwrap(); let state = { let guard = storage.state.read().unwrap(); diff --git a/storage/tests/storage.rs b/storage/tests/storage.rs index 433a489..7bdc9f8 100644 --- a/storage/tests/storage.rs +++ b/storage/tests/storage.rs @@ -16,7 +16,7 @@ fn get_returns_latest_entry() { num_memtable_limit: 5, enable_wal: true, }; - let storage = cabin_storage::new(config); + let storage = cabin_storage::new(config).unwrap(); let entries = vec![ (b"age", b"20"), (b"age", b"21"), @@ -41,7 +41,7 @@ fn can_read_frozen_memtable() { num_memtable_limit: 5, enable_wal: true, }; - let storage = cabin_storage::new(config); + let storage = cabin_storage::new(config).unwrap(); let entries = vec![(b"1", b"20"), (b"2", b"21"), (b"3", b"22"), (b"4", b"23")]; for (k, v) in entries { @@ -62,7 +62,7 @@ fn get_invalid_key() { num_memtable_limit: 5, enable_wal: true, }; - let storage = cabin_storage::new(config); + let storage = cabin_storage::new(config).unwrap(); storage.get(b"1").unwrap(); } @@ -76,7 +76,7 @@ fn scan_items() { num_memtable_limit: 5, enable_wal: true, }; - let storage = cabin_storage::new(config); + let storage = cabin_storage::new(config).unwrap(); let entries = vec![ (b"e", b"4"), (b"a", b"1"), From 5268ac31e2a97cb80c63301367747c50baa9912e Mon Sep 17 00:00:00 2001 From: jobala Date: Thu, 6 Nov 2025 19:15:20 +0300 Subject: [PATCH 3/3] move things around --- storage/src/compaction/compact.rs | 13 +- storage/src/compaction/flush.rs | 7 +- storage/src/lib.rs | 6 +- storage/src/lsm_storage.rs | 724 +---------------------------- storage/src/lsm_storage_inner.rs | 729 ++++++++++++++++++++++++++++++ 5 files changed, 768 insertions(+), 711 deletions(-) create mode 100644 storage/src/lsm_storage_inner.rs diff --git a/storage/src/compaction/compact.rs b/storage/src/compaction/compact.rs index 9e3fa21..a257576 100644 --- a/storage/src/compaction/compact.rs +++ b/storage/src/compaction/compact.rs @@ -8,13 +8,14 @@ use std::{ use anyhow::{Ok, Result}; use crate::{ - SSTableBuilder, SSTableIterator, Storage, common::iterator::StorageIterator, - iterators::merged_iterator::MergedIterator, lsm_storage::StorageState, - manifest::ManifestRecord::Compaction, + SSTableBuilder, SSTableIterator, common::iterator::StorageIterator, + iterators::merged_iterator::MergedIterator, lsm_storage_inner::StorageInner, + lsm_storage_inner::StorageState, manifest::ManifestRecord::Compaction, }; + const COMPACT_INTERVAL: Duration = Duration::from_secs(60); -impl Storage { +impl StorageInner { pub fn trigger_compaction(&self) -> Result<()> { let state = { let guard = self.state.read().unwrap(); @@ -101,7 +102,9 @@ mod tests { use bytes::Bytes; use tempfile::tempdir; - use crate::{Config, common::iterator::StorageIterator, lsm_util::get_entries, new}; + use crate::{ + Config, common::iterator::StorageIterator, lsm_storage_inner::new, lsm_util::get_entries, + }; #[test] fn test_compaction() { diff --git a/storage/src/compaction/flush.rs b/storage/src/compaction/flush.rs index 161a8a6..a96a5b6 100644 --- a/storage/src/compaction/flush.rs +++ b/storage/src/compaction/flush.rs @@ -7,11 +7,14 @@ use std::{ time::Duration, }; -use crate::{SSTableBuilder, Storage, lsm_storage::StorageState, manifest::ManifestRecord::Flush}; +use crate::{ + SSTableBuilder, lsm_storage_inner::StorageInner, lsm_storage_inner::StorageState, + manifest::ManifestRecord::Flush, +}; const FLUSH_INTERVAL: Duration = Duration::from_millis(50); -impl Storage { +impl StorageInner { pub(crate) fn flush_frozen_memtable(&self) -> Result<()> { let sst_id = { let mut sst_builder = SSTableBuilder::new(self.config.block_size); diff --git a/storage/src/lib.rs b/storage/src/lib.rs index d0d9574..7c75ddb 100644 --- a/storage/src/lib.rs +++ b/storage/src/lib.rs @@ -1,7 +1,8 @@ pub mod common; pub mod compaction; pub mod iterators; -pub mod lsm_storage; +mod lsm_storage; +mod lsm_storage_inner; mod lsm_util; mod manifest; pub mod memtable; @@ -9,5 +10,6 @@ mod wal; mod block; mod sst; -pub use lsm_storage::{Config, Storage, new}; +pub use lsm_storage::new; +pub use lsm_storage_inner::Config; pub use sst::{FileObject, SSTable, SSTableBuilder, SSTableIterator}; diff --git a/storage/src/lsm_storage.rs b/storage/src/lsm_storage.rs index c0c29f2..d1f52ad 100644 --- a/storage/src/lsm_storage.rs +++ b/storage/src/lsm_storage.rs @@ -1,728 +1,48 @@ +use std::ops::Bound; +use std::sync::Arc; +use std::thread::JoinHandle; + use anyhow::Result; use bytes::Bytes; -use std::{ - collections::HashMap, - ops::Bound, - path::Path, - sync::{ - Arc, Mutex, MutexGuard, RwLock, - atomic::{AtomicUsize, Ordering::SeqCst}, - }, -}; -use crate::{ - SSTable, SSTableIterator, - common::{errors::KeyNotFound, iterator::StorageIterator}, - iterators::{ - concat_iterator::ConcatIterator, lsm_iterator::LsmIterator, - merged_iterator::MergedIterator, two_merge_iterator::TwoMergeIterator, - }, - lsm_util::{create_db_dir, load_sstables}, - manifest::{Manifest, ManifestRecord}, - memtable::{memtable_iterator::map_bound, table::Memtable}, - sst::BlockCache, -}; +use crate::common::errors::KeyNotFound; +use crate::iterators::lsm_iterator::LsmIterator; +use crate::lsm_storage_inner; +use crate::{Config, lsm_storage_inner::StorageInner}; -#[derive(Debug)] pub struct Storage { - pub(crate) state: RwLock>, - pub(crate) config: Config, - pub(crate) block_cache: Arc, - pub(crate) state_lock: Mutex<()>, - pub(crate) sst_id: AtomicUsize, - pub(crate) manifest: Manifest, -} - -#[derive(Debug)] -pub(crate) struct StorageState { - pub(crate) memtable: Arc, - pub(crate) frozen_memtables: Vec>, - pub(crate) l0_sstables: Vec, - pub(crate) sstables: HashMap>, - pub(crate) levels: Vec<(usize, Vec)>, -} - -#[derive(Debug)] -pub struct Config { - pub sst_size: usize, - pub block_size: usize, - pub num_memtable_limit: usize, - pub db_dir: String, - pub enable_wal: bool, + inner: Arc, + compacter: JoinHandle<()>, + flusher: JoinHandle<()>, } pub fn new(config: Config) -> Result> { - let state_lock = Mutex::new(()); - let block_cache = Arc::new(BlockCache::new(4096)); - let db_dir = Path::new(&config.db_dir); - - create_db_dir(db_dir); - - let manifest; - let mut manifest_records: Vec = vec![]; - let manifest_file = db_dir.join("manifest"); - match Manifest::recover(&manifest_file) { - Ok((man, manifest_recs)) => { - manifest = man; - manifest_records = manifest_recs; - } - Err(_) => manifest = Manifest::create(manifest_file)?, - } - - let (memtable_ids, l0_sst_ids, l1_sst_ids, sstables) = - load_sstables(db_dir, block_cache, manifest_records)?; - let sst_id = match ([&memtable_ids[..], &l0_sst_ids[..], &l1_sst_ids[..]].concat()) - .iter() - .max() - { - Some(id) => id + 1, - None => 0, - }; - - let (memtable, frozen_memtables) = match config.enable_wal { - true => { - let wal_path = db_dir.join(format!("{sst_id}.wal")); - if memtable_ids.is_empty() { - let memtable = Memtable::new_with_wal(sst_id, wal_path.as_path())?; - manifest - .add_record( - &state_lock.lock().unwrap(), - ManifestRecord::NewMemtable(sst_id), - ) - .expect("added manifest record"); - - (Arc::new(memtable), vec![]) - } else { - let mut memtables = vec![]; - for id in memtable_ids { - let wal_path = db_dir.join(format!("{id}.wal")); - let memtable = Memtable::new_with_wal(id, wal_path.as_path())?; - memtables.push(Arc::new(memtable)); - } - - (memtables.remove(0), memtables) - } - } - _ => { - manifest - .add_record( - &state_lock.lock().unwrap(), - ManifestRecord::NewMemtable(sst_id), - ) - .expect("added manifest record"); - - (Arc::new(Memtable::new(sst_id)), vec![]) - } - }; - - if memtable.get_size() == 0 { - manifest - .add_record( - &state_lock.lock().unwrap(), - ManifestRecord::NewMemtable(sst_id), - ) - .expect("added manifest record"); - } + let inner = lsm_storage_inner::new(config)?; + let compacter = inner.spawn_compacter(); + let flusher = inner.spawn_flusher(); Ok(Arc::new(Storage { - config, - state_lock, - manifest, - sst_id: AtomicUsize::new(sst_id), - block_cache: Arc::new(BlockCache::new(1 << 20)), // 1mb cache - state: RwLock::new(Arc::new(StorageState { - sstables, - frozen_memtables, - memtable, - l0_sstables: l0_sst_ids, - levels: vec![(0, l1_sst_ids)], - })), + inner, + compacter, + flusher, })) } impl Storage { pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> { - let size; - - { - let guard = self.state.read().unwrap(); - guard.memtable.put(key, value)?; - size = guard.memtable.get_size(); - } - - self.try_freeze(size)?; - Ok(()) + self.inner.put(key, value) } pub fn get(&self, key: &[u8]) -> Result { - let state = { - let guard = self.state.read().unwrap(); - guard.clone() - }; - - let mut res = state.memtable.get(key); - - // search in frozen memtables - if res.is_none() { - for frozen_table in state.frozen_memtables.clone() { - if frozen_table.get(key).is_some() { - res = frozen_table.get(key); - break; - } - } - } - - // search in l0 ssts - if res.is_none() { - let mut table_iters = Vec::with_capacity(state.l0_sstables.len()); - for table_id in state.l0_sstables.iter() { - let table = state.sstables[table_id].clone(); - if key < table.first_key() || key > table.last_key() { - continue; - } - - let iter = SSTableIterator::create_and_seek_to_key(table, key).unwrap(); - table_iters.push(iter); - } - - let merged_iter = MergedIterator::new(table_iters); - if !merged_iter.key().is_empty() && merged_iter.key() == key { - res = Some(Bytes::copy_from_slice(merged_iter.value())) - } else { - res = None; - } - } - - // search in l1 sstables - if res.is_none() { - let mut tables = Vec::with_capacity(state.levels[0].1.len()); - for table_id in state.levels[0].1.iter() { - let table = state.sstables[table_id].clone(); - if key < table.first_key() || key > table.last_key() { - continue; - } - tables.push(table); - } - - let concat_iter = ConcatIterator::create_and_seek_to_key(tables, key).unwrap(); - if !concat_iter.key().is_empty() && concat_iter.key() == key { - res = Some(Bytes::copy_from_slice(concat_iter.value())) - } else { - res = None; - } - } - - match res { - Some(value) => Ok(value), - None => Err(KeyNotFound), - } + self.inner.get(key) } pub fn scan(&self, lower: Bound<&[u8]>, upper: Bound<&[u8]>) -> Result { - let state = { - let guard = self.state.read().unwrap(); - guard.clone() - }; - - let mut iters = vec![]; - - // insert memtables from newest to oldest - iters.push(state.memtable.scan(lower, upper)); - - let frozen_tables = &state.frozen_memtables; - for frozen_table in frozen_tables { - iters.push(frozen_table.scan(lower, upper)); - } - let mem_iters = MergedIterator::new(iters); - - let mut table_iters = Vec::with_capacity(state.l0_sstables.len()); - // TODO: only consider sstables that might contain the key - for table_id in state.l0_sstables.iter() { - let table = state.sstables[table_id].clone(); - let iter = match lower { - Bound::Included(key) => SSTableIterator::create_and_seek_to_key(table, key)?, - Bound::Unbounded => SSTableIterator::create_and_seek_to_first(table)?, - Bound::Excluded(key) => { - let mut iter = SSTableIterator::create_and_seek_to_key(table, key)?; - - if iter.is_valid() && iter.key() == key { - iter.next()?; - } - iter - } - }; - - table_iters.push(iter); - } - - let concat_iter = { - let mut tables = Vec::with_capacity(state.levels[0].1.len()); - for table_id in state.levels[0].1.iter() { - let table = state.sstables[table_id].clone(); - tables.push(table); - } - match lower { - Bound::Included(key) => ConcatIterator::create_and_seek_to_key(tables, key)?, - Bound::Unbounded => ConcatIterator::create_and_seek_to_first(tables)?, - Bound::Excluded(key) => { - let mut iter = ConcatIterator::create_and_seek_to_key(tables, key)?; - - if iter.is_valid() && iter.key() == key { - iter.next()?; - } - iter - } - } - }; - - let sst_iters = MergedIterator::new(table_iters); - let mem_l0 = TwoMergeIterator::create(mem_iters, sst_iters).unwrap(); - let mem_l0_l1 = TwoMergeIterator::create(mem_l0, concat_iter).unwrap(); - Ok(LsmIterator::new(mem_l0_l1, map_bound(upper))) + self.inner.scan(lower, upper) } - fn try_freeze(&self, size: usize) -> Result<()> { - if size >= self.config.sst_size { - let lock = self.state_lock.lock().unwrap(); - self.freeze(&lock)?; - } - + pub fn close(&self) -> Result<()> { + self.inner.sync()?; Ok(()) } - - fn freeze(&self, state_lock: &MutexGuard<()>) -> Result<()> { - let mut guard = self.state.write().unwrap(); - let memtable = guard.memtable.clone(); - memtable.sync_wal()?; - - // check again, another thread might have frozen the memtable already. - if memtable.get_size() >= self.config.sst_size { - let mut frozen_memtables = guard.frozen_memtables.clone(); - frozen_memtables.insert(0, memtable); - - let id = self.get_sst_id(); - self.manifest - .add_record(state_lock, ManifestRecord::NewMemtable(id))?; - let memtable = self.create_memtable(id)?; - - *guard = Arc::new(StorageState { - memtable: Arc::new(memtable), - frozen_memtables, - l0_sstables: guard.l0_sstables.clone(), - sstables: guard.sstables.clone(), - levels: guard.levels.clone(), - }); - - drop(guard); - } - - Ok(()) - } - - fn create_memtable(&self, id: usize) -> Result { - let wal_path = Path::new(&self.config.db_dir).join(format!("{id}.wal")); - let memtable = match self.config.enable_wal { - true => Memtable::new_with_wal(id, wal_path.as_path()).expect("memtable with wal"), - _ => Memtable::new(id), - }; - - Ok(memtable) - } - - pub(crate) fn get_sst_id(&self) -> usize { - self.sst_id.fetch_add(1, SeqCst); - self.sst_id.load(SeqCst) - } - pub fn sync(&self) -> Result<()> { - self.state.read().unwrap().memtable.sync_wal() - } -} - -#[cfg(test)] -mod tests { - use std::str::from_utf8; - - use tempfile::tempdir; - - use crate::lsm_util::get_entries; - - use super::*; - - #[test] - fn filled_up_memtables_are_frozen() { - let config = Config { - sst_size: 4, - block_size: 32, - db_dir: String::from(tempdir().unwrap().path().to_str().unwrap()), - num_memtable_limit: 5, - enable_wal: true, - }; - let storage = new(config).unwrap(); - - let input = vec![b"1", b"2", b"3", b"4", b"5"]; - for entry in input { - storage.put(entry, entry).unwrap(); - } - - assert_eq!(2, storage.state.read().unwrap().frozen_memtables.len()); - assert_eq!(2, storage.state.read().unwrap().memtable.get_size()); - } - - #[test] - fn can_flush_frozen_memtable() { - let config = Config { - sst_size: 4, - block_size: 32, - db_dir: String::from(tempdir().unwrap().path().to_str().unwrap()), - num_memtable_limit: 5, - enable_wal: true, - }; - let storage = new(config).unwrap(); - - let input = vec![b"1", b"2", b"3", b"4", b"5"]; - for entry in input { - storage.put(entry, entry).unwrap(); - } - - assert_eq!(2, storage.state.read().unwrap().frozen_memtables.len()); - assert_eq!(2, storage.state.read().unwrap().memtable.get_size()); - - storage - .flush_frozen_memtable() - .expect("memtable was frozen"); - assert_eq!(1, storage.state.read().unwrap().frozen_memtables.len()); - } - - #[test] - fn loads_sstables() { - let db_dir = String::from(tempdir().unwrap().path().to_str().unwrap()); - let config = Config { - sst_size: 4, - block_size: 32, - db_dir: db_dir.clone(), - num_memtable_limit: 5, - enable_wal: true, - }; - let storage = new(config).unwrap(); - - let input = vec![b"1", b"2", b"3", b"4", b"5"]; - for entry in input { - storage.put(entry, entry).unwrap(); - } - storage - .flush_frozen_memtable() - .expect("memtable to have been frozen"); - - // new storage instance - let config = Config { - sst_size: 4, - block_size: 32, - db_dir: db_dir.clone(), - num_memtable_limit: 5, - enable_wal: true, - }; - - let storage = new(config).unwrap(); - assert_eq!(1, storage.state.read().unwrap().l0_sstables.len()); - assert_eq!(0, storage.state.read().unwrap().l0_sstables[0]); - } - - #[test] - fn scans_storage_with_empty_memtables_and_filled_sstables() { - let db_dir = String::from(tempdir().unwrap().path().to_str().unwrap()); - let config = Config { - sst_size: 4, - block_size: 32, - db_dir: db_dir.clone(), - num_memtable_limit: 5, - enable_wal: false, - }; - let storage = new(config).unwrap(); - - for (key, value) in get_entries() { - storage.put(key, value).unwrap(); - } - storage - .flush_frozen_memtable() - .expect("memtable to have been frozen"); - - // new storage instance - let config = Config { - sst_size: 4, - block_size: 32, - db_dir: db_dir.clone(), - num_memtable_limit: 5, - enable_wal: true, - }; - - let storage = new(config).unwrap(); - let mut iter = storage.scan(Bound::Unbounded, Bound::Unbounded).unwrap(); - let mut res = vec![]; - - while iter.is_valid() { - res.push(iter.key().to_vec()); - let _ = iter.next(); - } - - assert_eq!(res, vec![b"a", b"b"]); - } - - #[test] - fn scans_through_filled_memtables_and_sstables() { - let db_dir = String::from(tempdir().unwrap().path().to_str().unwrap()); - let config = Config { - sst_size: 4, - block_size: 32, - db_dir: db_dir.clone(), - num_memtable_limit: 5, - enable_wal: true, - }; - let storage = new(config).unwrap(); - - for (key, value) in get_entries() { - storage.put(key, value).unwrap(); - } - - // will create sstables with a, b, c, d, e & f - storage - .flush_frozen_memtable() - .expect("memtable to have been frozen"); - storage - .flush_frozen_memtable() - .expect("memtable to have been frozen"); - storage - .flush_frozen_memtable() - .expect("memtable to have been frozen"); - - // new storage instance - let config = Config { - sst_size: 4, - block_size: 32, - db_dir: db_dir.clone(), - num_memtable_limit: 5, - enable_wal: true, - }; - - let new_entries = vec![(b"a", b"20"), (b"e", b"21"), (b"d", b"22"), (b"b", b"23")]; - let storage = new(config).unwrap(); - for (key, value) in new_entries { - let _ = storage.put(key, value); - } - - let mut iter = storage.scan(Bound::Unbounded, Bound::Unbounded).unwrap(); - let mut keys = vec![]; - let mut values = vec![]; - - while iter.is_valid() { - let k = from_utf8(iter.key()).unwrap(); - let v = from_utf8(iter.value()).unwrap(); - - keys.push(String::from(k)); - values.push(String::from(v)); - - let _ = iter.next(); - } - - // expect the first frozen memtable to have id 3 - // we already have sstables with id 0, 1 & 2 - // we use .last here because frozen memtables are stored newest to oldest - // with oldest being the first memtable to be frozen - assert_eq!( - 3, - storage - .state - .read() - .unwrap() - .frozen_memtables - .last() - .unwrap() - .id - ); - - assert_eq!( - keys, - vec!["a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l"] - ); - assert_eq!( - values, - vec![ - "20", "23", "3", "22", "21", "6", "7", "8", "9", "10", "11", "12" - ] - ); - } - - #[test] - fn reads_the_latest_version_of_a_key() { - let db_dir = String::from(tempdir().unwrap().path().to_str().unwrap()); - let config = Config { - sst_size: 4, - block_size: 32, - db_dir: db_dir.clone(), - num_memtable_limit: 5, - enable_wal: true, - }; - let storage = new(config).unwrap(); - - for (key, value) in get_entries() { - storage.put(key, value).unwrap(); - } - - // will create sstables with a, b, c, d, e & f - storage - .flush_frozen_memtable() - .expect("memtable to have been frozen"); - storage - .flush_frozen_memtable() - .expect("memtable to have been frozen"); - storage - .flush_frozen_memtable() - .expect("memtable to have been frozen"); - - // new storage instance - let config = Config { - sst_size: 4, - block_size: 32, - db_dir: db_dir.clone(), - num_memtable_limit: 5, - enable_wal: false, - }; - - let new_entries = vec![(b"a", b"20"), (b"e", b"21"), (b"d", b"22"), (b"b", b"23")]; - let storage = new(config).unwrap(); - for (key, value) in new_entries { - let _ = storage.put(key, value); - } - - // this will create an sst with a & e - storage - .flush_frozen_memtable() - .expect("memtable to have been frozen"); - - let mut iter = storage.scan(Bound::Unbounded, Bound::Unbounded).unwrap(); - let mut keys = vec![]; - let mut values = vec![]; - - while iter.is_valid() { - let k = from_utf8(iter.key()).unwrap(); - let v = from_utf8(iter.value()).unwrap(); - keys.push(String::from(k)); - values.push(String::from(v)); - - let _ = iter.next(); - } - - assert_eq!(keys, vec!["a", "b", "c", "d", "e", "f"]); - assert_eq!(values, vec!["20", "23", "3", "22", "21", "6"]); - } - - #[test] - fn get_key_within_range() { - let db_dir = String::from(tempdir().unwrap().path().to_str().unwrap()); - let config = Config { - sst_size: 4, - block_size: 32, - db_dir: db_dir.clone(), - num_memtable_limit: 5, - enable_wal: true, - }; - let storage = new(config).unwrap(); - - for (key, value) in get_entries() { - storage.put(key, value).unwrap(); - } - - // will create sstables with a, b, c, d, e & f - storage - .flush_frozen_memtable() - .expect("memtable to have been frozen"); - storage.trigger_compaction().expect("compacted"); - - storage - .flush_frozen_memtable() - .expect("memtable to have been frozen"); - storage - .flush_frozen_memtable() - .expect("memtable to have been frozen"); - - let new_entries = vec![(b"a", b"20"), (b"e", b"21"), (b"d", b"22"), (b"b", b"23")]; - for (key, value) in new_entries { - let _ = storage.put(key, value); - } - - // this will create an sst with a & e - storage - .flush_frozen_memtable() - .expect("memtable to have been frozen"); - - let mut iter = storage - .scan(Bound::Included(b"d"), Bound::Included(b"f")) - .unwrap(); - let mut keys = vec![]; - let mut values = vec![]; - - while iter.is_valid() { - let k = from_utf8(iter.key()).unwrap(); - let v = from_utf8(iter.value()).unwrap(); - - keys.push(String::from(k)); - values.push(String::from(v)); - - let _ = iter.next(); - } - - assert_eq!(keys, vec!["d", "e", "f"]); - assert_eq!(values, vec!["22", "21", "6"]); - } - - #[test] - fn test_manifest_recovery() { - let db_dir = String::from(tempdir().unwrap().path().to_str().unwrap()); - let config = Config { - sst_size: 4, - block_size: 32, - db_dir: db_dir.clone(), - num_memtable_limit: 5, - enable_wal: true, - }; - let storage = new(config).unwrap(); - - for (key, value) in get_entries() { - storage.put(key, value).unwrap(); - } - - // will create sstables with a, b, c, d, e & f - storage - .flush_frozen_memtable() - .expect("memtable to have been frozen"); - storage.trigger_compaction().expect("compacted"); - - storage - .flush_frozen_memtable() - .expect("memtable to have been frozen"); - storage.trigger_compaction().expect("compacted"); - - storage - .flush_frozen_memtable() - .expect("memtable to have been frozen"); - storage - .flush_frozen_memtable() - .expect("memtable to have been frozen"); - - let config = Config { - sst_size: 4, - block_size: 32, - db_dir: db_dir.clone(), - num_memtable_limit: 5, - enable_wal: true, - }; - let storage = new(config).unwrap(); - - let state = { - let guard = storage.state.read().unwrap(); - guard.clone() - }; - - assert_eq!(state.l0_sstables, [3, 2]); - assert_eq!(state.levels[0].1, [8]) - } } diff --git a/storage/src/lsm_storage_inner.rs b/storage/src/lsm_storage_inner.rs new file mode 100644 index 0000000..ffa21de --- /dev/null +++ b/storage/src/lsm_storage_inner.rs @@ -0,0 +1,729 @@ +use anyhow::Result; +use bytes::Bytes; +use std::{ + collections::HashMap, + ops::Bound, + path::Path, + sync::{ + Arc, Mutex, MutexGuard, RwLock, + atomic::{AtomicUsize, Ordering::SeqCst}, + }, +}; + +use crate::{ + SSTable, SSTableIterator, + common::{errors::KeyNotFound, iterator::StorageIterator}, + iterators::{ + concat_iterator::ConcatIterator, lsm_iterator::LsmIterator, + merged_iterator::MergedIterator, two_merge_iterator::TwoMergeIterator, + }, + lsm_util::{create_db_dir, load_sstables}, + manifest::{Manifest, ManifestRecord}, + memtable::{memtable_iterator::map_bound, table::Memtable}, + sst::BlockCache, +}; + +#[derive(Debug)] +pub struct StorageInner { + pub(crate) state: RwLock>, + pub(crate) config: Config, + pub(crate) block_cache: Arc, + pub(crate) state_lock: Mutex<()>, + pub(crate) sst_id: AtomicUsize, + pub(crate) manifest: Manifest, +} + +#[derive(Debug)] +pub(crate) struct StorageState { + pub(crate) memtable: Arc, + pub(crate) frozen_memtables: Vec>, + pub(crate) l0_sstables: Vec, + pub(crate) sstables: HashMap>, + pub(crate) levels: Vec<(usize, Vec)>, +} + +#[derive(Debug)] +pub struct Config { + pub sst_size: usize, + pub block_size: usize, + pub num_memtable_limit: usize, + pub db_dir: String, + pub enable_wal: bool, +} + +pub fn new(config: Config) -> Result> { + let state_lock = Mutex::new(()); + let block_cache = Arc::new(BlockCache::new(4096)); + let db_dir = Path::new(&config.db_dir); + + create_db_dir(db_dir); + + let manifest; + let mut manifest_records: Vec = vec![]; + let manifest_file = db_dir.join("manifest"); + match Manifest::recover(&manifest_file) { + Ok((man, manifest_recs)) => { + manifest = man; + manifest_records = manifest_recs; + } + Err(_) => manifest = Manifest::create(manifest_file)?, + } + + let (memtable_ids, l0_sst_ids, l1_sst_ids, sstables) = + load_sstables(db_dir, block_cache, manifest_records)?; + let sst_id = match ([&memtable_ids[..], &l0_sst_ids[..], &l1_sst_ids[..]].concat()) + .iter() + .max() + { + Some(id) => id + 1, + None => 0, + }; + + let (memtable, frozen_memtables) = match config.enable_wal { + true => { + let wal_path = db_dir.join(format!("{sst_id}.wal")); + if memtable_ids.is_empty() { + let memtable = Memtable::new_with_wal(sst_id, wal_path.as_path())?; + manifest + .add_record( + &state_lock.lock().unwrap(), + ManifestRecord::NewMemtable(sst_id), + ) + .expect("added manifest record"); + + (Arc::new(memtable), vec![]) + } else { + let mut memtables = vec![]; + for id in memtable_ids { + let wal_path = db_dir.join(format!("{id}.wal")); + let memtable = Memtable::new_with_wal(id, wal_path.as_path())?; + memtables.push(Arc::new(memtable)); + } + + (memtables.remove(0), memtables) + } + } + _ => { + manifest + .add_record( + &state_lock.lock().unwrap(), + ManifestRecord::NewMemtable(sst_id), + ) + .expect("added manifest record"); + + (Arc::new(Memtable::new(sst_id)), vec![]) + } + }; + + if memtable.get_size() == 0 { + manifest + .add_record( + &state_lock.lock().unwrap(), + ManifestRecord::NewMemtable(sst_id), + ) + .expect("added manifest record"); + } + + Ok(Arc::new(StorageInner { + config, + state_lock, + manifest, + sst_id: AtomicUsize::new(sst_id), + block_cache: Arc::new(BlockCache::new(1 << 20)), // 1mb cache + state: RwLock::new(Arc::new(StorageState { + sstables, + frozen_memtables, + memtable, + l0_sstables: l0_sst_ids, + levels: vec![(0, l1_sst_ids)], + })), + })) +} + +impl StorageInner { + pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> { + let size; + + { + let guard = self.state.read().unwrap(); + guard.memtable.put(key, value)?; + size = guard.memtable.get_size(); + } + + self.try_freeze(size)?; + Ok(()) + } + + pub fn get(&self, key: &[u8]) -> Result { + let state = { + let guard = self.state.read().unwrap(); + guard.clone() + }; + + let mut res = state.memtable.get(key); + + // search in frozen memtables + if res.is_none() { + for frozen_table in state.frozen_memtables.clone() { + if frozen_table.get(key).is_some() { + res = frozen_table.get(key); + break; + } + } + } + + // search in l0 ssts + if res.is_none() { + let mut table_iters = Vec::with_capacity(state.l0_sstables.len()); + for table_id in state.l0_sstables.iter() { + let table = state.sstables[table_id].clone(); + if key < table.first_key() || key > table.last_key() { + continue; + } + + let iter = SSTableIterator::create_and_seek_to_key(table, key).unwrap(); + table_iters.push(iter); + } + + let merged_iter = MergedIterator::new(table_iters); + if !merged_iter.key().is_empty() && merged_iter.key() == key { + res = Some(Bytes::copy_from_slice(merged_iter.value())) + } else { + res = None; + } + } + + // search in l1 sstables + if res.is_none() { + let mut tables = Vec::with_capacity(state.levels[0].1.len()); + for table_id in state.levels[0].1.iter() { + let table = state.sstables[table_id].clone(); + if key < table.first_key() || key > table.last_key() { + continue; + } + tables.push(table); + } + + let concat_iter = ConcatIterator::create_and_seek_to_key(tables, key).unwrap(); + if !concat_iter.key().is_empty() && concat_iter.key() == key { + res = Some(Bytes::copy_from_slice(concat_iter.value())) + } else { + res = None; + } + } + + match res { + Some(value) => Ok(value), + None => Err(KeyNotFound), + } + } + + pub fn scan(&self, lower: Bound<&[u8]>, upper: Bound<&[u8]>) -> Result { + let state = { + let guard = self.state.read().unwrap(); + guard.clone() + }; + + let mut iters = vec![]; + + // insert memtables from newest to oldest + iters.push(state.memtable.scan(lower, upper)); + + let frozen_tables = &state.frozen_memtables; + for frozen_table in frozen_tables { + iters.push(frozen_table.scan(lower, upper)); + } + let mem_iters = MergedIterator::new(iters); + + let mut table_iters = Vec::with_capacity(state.l0_sstables.len()); + // TODO: only consider sstables that might contain the key + for table_id in state.l0_sstables.iter() { + let table = state.sstables[table_id].clone(); + let iter = match lower { + Bound::Included(key) => SSTableIterator::create_and_seek_to_key(table, key)?, + Bound::Unbounded => SSTableIterator::create_and_seek_to_first(table)?, + Bound::Excluded(key) => { + let mut iter = SSTableIterator::create_and_seek_to_key(table, key)?; + + if iter.is_valid() && iter.key() == key { + iter.next()?; + } + iter + } + }; + + table_iters.push(iter); + } + + let concat_iter = { + let mut tables = Vec::with_capacity(state.levels[0].1.len()); + for table_id in state.levels[0].1.iter() { + let table = state.sstables[table_id].clone(); + tables.push(table); + } + match lower { + Bound::Included(key) => ConcatIterator::create_and_seek_to_key(tables, key)?, + Bound::Unbounded => ConcatIterator::create_and_seek_to_first(tables)?, + Bound::Excluded(key) => { + let mut iter = ConcatIterator::create_and_seek_to_key(tables, key)?; + + if iter.is_valid() && iter.key() == key { + iter.next()?; + } + iter + } + } + }; + + let sst_iters = MergedIterator::new(table_iters); + let mem_l0 = TwoMergeIterator::create(mem_iters, sst_iters).unwrap(); + let mem_l0_l1 = TwoMergeIterator::create(mem_l0, concat_iter).unwrap(); + Ok(LsmIterator::new(mem_l0_l1, map_bound(upper))) + } + + fn try_freeze(&self, size: usize) -> Result<()> { + if size >= self.config.sst_size { + let lock = self.state_lock.lock().unwrap(); + self.freeze(&lock)?; + } + + Ok(()) + } + + fn freeze(&self, state_lock: &MutexGuard<()>) -> Result<()> { + let mut guard = self.state.write().unwrap(); + let memtable = guard.memtable.clone(); + memtable.sync_wal()?; + + // check again, another thread might have frozen the memtable already. + if memtable.get_size() >= self.config.sst_size { + let mut frozen_memtables = guard.frozen_memtables.clone(); + frozen_memtables.insert(0, memtable); + + let id = self.get_sst_id(); + self.manifest + .add_record(state_lock, ManifestRecord::NewMemtable(id))?; + let memtable = self.create_memtable(id)?; + + *guard = Arc::new(StorageState { + memtable: Arc::new(memtable), + frozen_memtables, + l0_sstables: guard.l0_sstables.clone(), + sstables: guard.sstables.clone(), + levels: guard.levels.clone(), + }); + + drop(guard); + } + + Ok(()) + } + + fn create_memtable(&self, id: usize) -> Result { + let wal_path = Path::new(&self.config.db_dir).join(format!("{id}.wal")); + let memtable = match self.config.enable_wal { + true => Memtable::new_with_wal(id, wal_path.as_path()).expect("memtable with wal"), + _ => Memtable::new(id), + }; + + Ok(memtable) + } + + pub(crate) fn get_sst_id(&self) -> usize { + self.sst_id.fetch_add(1, SeqCst); + self.sst_id.load(SeqCst) + } + + pub fn sync(&self) -> Result<()> { + self.state.read().unwrap().memtable.sync_wal() + } +} + +#[cfg(test)] +mod tests { + use std::str::from_utf8; + + use tempfile::tempdir; + + use crate::lsm_util::get_entries; + + use super::*; + + #[test] + fn filled_up_memtables_are_frozen() { + let config = Config { + sst_size: 4, + block_size: 32, + db_dir: String::from(tempdir().unwrap().path().to_str().unwrap()), + num_memtable_limit: 5, + enable_wal: true, + }; + let storage = new(config).unwrap(); + + let input = vec![b"1", b"2", b"3", b"4", b"5"]; + for entry in input { + storage.put(entry, entry).unwrap(); + } + + assert_eq!(2, storage.state.read().unwrap().frozen_memtables.len()); + assert_eq!(2, storage.state.read().unwrap().memtable.get_size()); + } + + #[test] + fn can_flush_frozen_memtable() { + let config = Config { + sst_size: 4, + block_size: 32, + db_dir: String::from(tempdir().unwrap().path().to_str().unwrap()), + num_memtable_limit: 5, + enable_wal: true, + }; + let storage = new(config).unwrap(); + + let input = vec![b"1", b"2", b"3", b"4", b"5"]; + for entry in input { + storage.put(entry, entry).unwrap(); + } + + assert_eq!(2, storage.state.read().unwrap().frozen_memtables.len()); + assert_eq!(2, storage.state.read().unwrap().memtable.get_size()); + + storage + .flush_frozen_memtable() + .expect("memtable was frozen"); + assert_eq!(1, storage.state.read().unwrap().frozen_memtables.len()); + } + + #[test] + fn loads_sstables() { + let db_dir = String::from(tempdir().unwrap().path().to_str().unwrap()); + let config = Config { + sst_size: 4, + block_size: 32, + db_dir: db_dir.clone(), + num_memtable_limit: 5, + enable_wal: true, + }; + let storage = new(config).unwrap(); + + let input = vec![b"1", b"2", b"3", b"4", b"5"]; + for entry in input { + storage.put(entry, entry).unwrap(); + } + storage + .flush_frozen_memtable() + .expect("memtable to have been frozen"); + + // new storage instance + let config = Config { + sst_size: 4, + block_size: 32, + db_dir: db_dir.clone(), + num_memtable_limit: 5, + enable_wal: true, + }; + + let storage = new(config).unwrap(); + assert_eq!(1, storage.state.read().unwrap().l0_sstables.len()); + assert_eq!(0, storage.state.read().unwrap().l0_sstables[0]); + } + + #[test] + fn scans_storage_with_empty_memtables_and_filled_sstables() { + let db_dir = String::from(tempdir().unwrap().path().to_str().unwrap()); + let config = Config { + sst_size: 4, + block_size: 32, + db_dir: db_dir.clone(), + num_memtable_limit: 5, + enable_wal: false, + }; + let storage = new(config).unwrap(); + + for (key, value) in get_entries() { + storage.put(key, value).unwrap(); + } + storage + .flush_frozen_memtable() + .expect("memtable to have been frozen"); + + // new storage instance + let config = Config { + sst_size: 4, + block_size: 32, + db_dir: db_dir.clone(), + num_memtable_limit: 5, + enable_wal: true, + }; + + let storage = new(config).unwrap(); + let mut iter = storage.scan(Bound::Unbounded, Bound::Unbounded).unwrap(); + let mut res = vec![]; + + while iter.is_valid() { + res.push(iter.key().to_vec()); + let _ = iter.next(); + } + + assert_eq!(res, vec![b"a", b"b"]); + } + + #[test] + fn scans_through_filled_memtables_and_sstables() { + let db_dir = String::from(tempdir().unwrap().path().to_str().unwrap()); + let config = Config { + sst_size: 4, + block_size: 32, + db_dir: db_dir.clone(), + num_memtable_limit: 5, + enable_wal: true, + }; + let storage = new(config).unwrap(); + + for (key, value) in get_entries() { + storage.put(key, value).unwrap(); + } + + // will create sstables with a, b, c, d, e & f + storage + .flush_frozen_memtable() + .expect("memtable to have been frozen"); + storage + .flush_frozen_memtable() + .expect("memtable to have been frozen"); + storage + .flush_frozen_memtable() + .expect("memtable to have been frozen"); + + // new storage instance + let config = Config { + sst_size: 4, + block_size: 32, + db_dir: db_dir.clone(), + num_memtable_limit: 5, + enable_wal: true, + }; + + let new_entries = vec![(b"a", b"20"), (b"e", b"21"), (b"d", b"22"), (b"b", b"23")]; + let storage = new(config).unwrap(); + for (key, value) in new_entries { + let _ = storage.put(key, value); + } + + let mut iter = storage.scan(Bound::Unbounded, Bound::Unbounded).unwrap(); + let mut keys = vec![]; + let mut values = vec![]; + + while iter.is_valid() { + let k = from_utf8(iter.key()).unwrap(); + let v = from_utf8(iter.value()).unwrap(); + + keys.push(String::from(k)); + values.push(String::from(v)); + + let _ = iter.next(); + } + + // expect the first frozen memtable to have id 3 + // we already have sstables with id 0, 1 & 2 + // we use .last here because frozen memtables are stored newest to oldest + // with oldest being the first memtable to be frozen + assert_eq!( + 3, + storage + .state + .read() + .unwrap() + .frozen_memtables + .last() + .unwrap() + .id + ); + + assert_eq!( + keys, + vec!["a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l"] + ); + assert_eq!( + values, + vec![ + "20", "23", "3", "22", "21", "6", "7", "8", "9", "10", "11", "12" + ] + ); + } + + #[test] + fn reads_the_latest_version_of_a_key() { + let db_dir = String::from(tempdir().unwrap().path().to_str().unwrap()); + let config = Config { + sst_size: 4, + block_size: 32, + db_dir: db_dir.clone(), + num_memtable_limit: 5, + enable_wal: true, + }; + let storage = new(config).unwrap(); + + for (key, value) in get_entries() { + storage.put(key, value).unwrap(); + } + + // will create sstables with a, b, c, d, e & f + storage + .flush_frozen_memtable() + .expect("memtable to have been frozen"); + storage + .flush_frozen_memtable() + .expect("memtable to have been frozen"); + storage + .flush_frozen_memtable() + .expect("memtable to have been frozen"); + + // new storage instance + let config = Config { + sst_size: 4, + block_size: 32, + db_dir: db_dir.clone(), + num_memtable_limit: 5, + enable_wal: false, + }; + + let new_entries = vec![(b"a", b"20"), (b"e", b"21"), (b"d", b"22"), (b"b", b"23")]; + let storage = new(config).unwrap(); + for (key, value) in new_entries { + let _ = storage.put(key, value); + } + + // this will create an sst with a & e + storage + .flush_frozen_memtable() + .expect("memtable to have been frozen"); + + let mut iter = storage.scan(Bound::Unbounded, Bound::Unbounded).unwrap(); + let mut keys = vec![]; + let mut values = vec![]; + + while iter.is_valid() { + let k = from_utf8(iter.key()).unwrap(); + let v = from_utf8(iter.value()).unwrap(); + keys.push(String::from(k)); + values.push(String::from(v)); + + let _ = iter.next(); + } + + assert_eq!(keys, vec!["a", "b", "c", "d", "e", "f"]); + assert_eq!(values, vec!["20", "23", "3", "22", "21", "6"]); + } + + #[test] + fn get_key_within_range() { + let db_dir = String::from(tempdir().unwrap().path().to_str().unwrap()); + let config = Config { + sst_size: 4, + block_size: 32, + db_dir: db_dir.clone(), + num_memtable_limit: 5, + enable_wal: true, + }; + let storage = new(config).unwrap(); + + for (key, value) in get_entries() { + storage.put(key, value).unwrap(); + } + + // will create sstables with a, b, c, d, e & f + storage + .flush_frozen_memtable() + .expect("memtable to have been frozen"); + storage.trigger_compaction().expect("compacted"); + + storage + .flush_frozen_memtable() + .expect("memtable to have been frozen"); + storage + .flush_frozen_memtable() + .expect("memtable to have been frozen"); + + let new_entries = vec![(b"a", b"20"), (b"e", b"21"), (b"d", b"22"), (b"b", b"23")]; + for (key, value) in new_entries { + let _ = storage.put(key, value); + } + + // this will create an sst with a & e + storage + .flush_frozen_memtable() + .expect("memtable to have been frozen"); + + let mut iter = storage + .scan(Bound::Included(b"d"), Bound::Included(b"f")) + .unwrap(); + let mut keys = vec![]; + let mut values = vec![]; + + while iter.is_valid() { + let k = from_utf8(iter.key()).unwrap(); + let v = from_utf8(iter.value()).unwrap(); + + keys.push(String::from(k)); + values.push(String::from(v)); + + let _ = iter.next(); + } + + assert_eq!(keys, vec!["d", "e", "f"]); + assert_eq!(values, vec!["22", "21", "6"]); + } + + #[test] + fn test_manifest_recovery() { + let db_dir = String::from(tempdir().unwrap().path().to_str().unwrap()); + let config = Config { + sst_size: 4, + block_size: 32, + db_dir: db_dir.clone(), + num_memtable_limit: 5, + enable_wal: true, + }; + let storage = new(config).unwrap(); + + for (key, value) in get_entries() { + storage.put(key, value).unwrap(); + } + + // will create sstables with a, b, c, d, e & f + storage + .flush_frozen_memtable() + .expect("memtable to have been frozen"); + storage.trigger_compaction().expect("compacted"); + + storage + .flush_frozen_memtable() + .expect("memtable to have been frozen"); + storage.trigger_compaction().expect("compacted"); + + storage + .flush_frozen_memtable() + .expect("memtable to have been frozen"); + storage + .flush_frozen_memtable() + .expect("memtable to have been frozen"); + + let config = Config { + sst_size: 4, + block_size: 32, + db_dir: db_dir.clone(), + num_memtable_limit: 5, + enable_wal: true, + }; + let storage = new(config).unwrap(); + + let state = { + let guard = storage.state.read().unwrap(); + guard.clone() + }; + + assert_eq!(state.l0_sstables, [3, 2]); + assert_eq!(state.levels[0].1, [8]) + } +}