diff --git a/.gitignore b/.gitignore index ad67955..bffb3f8 100644 --- a/.gitignore +++ b/.gitignore @@ -19,3 +19,6 @@ target # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. #.idea/ + +# cabin artifacts +.cabin diff --git a/storage/src/iterators/merged_iterator.rs b/storage/src/iterators/merged_iterator.rs index 70d82ee..be65a06 100644 --- a/storage/src/iterators/merged_iterator.rs +++ b/storage/src/iterators/merged_iterator.rs @@ -170,7 +170,7 @@ mod test { let mut res = vec![]; for item in items { - let memtable = Memtable::default(); + let memtable = Memtable::new(0); for (key, val) in item { let _ = memtable.put(key, val); diff --git a/storage/src/lib.rs b/storage/src/lib.rs index a097776..7a51736 100644 --- a/storage/src/lib.rs +++ b/storage/src/lib.rs @@ -1,6 +1,7 @@ pub mod common; pub mod iterators; pub mod lsm_storage; +mod lsm_util; pub mod memtable; mod block; diff --git a/storage/src/lsm_storage.rs b/storage/src/lsm_storage.rs index 1fd8f50..103da63 100644 --- a/storage/src/lsm_storage.rs +++ b/storage/src/lsm_storage.rs @@ -1,16 +1,21 @@ use std::{ collections::HashMap, ops::Bound, - sync::{Arc, Mutex, MutexGuard, RwLock}, + path::Path, + sync::{ + Arc, Mutex, MutexGuard, RwLock, + atomic::{AtomicUsize, Ordering::SeqCst}, + }, }; use crate::{ - SSTable, SSTableIterator, + SSTable, SSTableBuilder, SSTableIterator, common::{errors::KeyNotFound, iterator::StorageIterator}, iterators::{ lsm_iterator::LsmIterator, merged_iterator::MergedIterator, two_merge_iterator::TwoMergeIterator, }, + lsm_util::{create_db_dir, load_sstables}, memtable::{memtable_iterator::map_bound, table::Memtable}, sst::BlockCache, }; @@ -22,6 +27,7 @@ pub struct Storage { state: RwLock>, block_cache: Arc, state_lock: Mutex<()>, + sst_id: AtomicUsize, config: Config, } @@ -36,18 +42,31 @@ struct StorageState { #[derive(Debug)] pub struct Config { pub sst_size: usize, + pub block_size: usize, + pub db_dir: String, } pub fn new(config: Config) -> Storage { + let db_dir = Path::new(&config.db_dir); + create_db_dir(db_dir); + let block_cache = Arc::new(BlockCache::new(1 << 20)); // 4gb + let (l0_sstables, sstables) = load_sstables(db_dir, block_cache).expect("loaded sstables"); + + let sst_id = match l0_sstables.iter().max() { + Some(id) => id + 1, + None => 0, + }; + Storage { config, + sst_id: AtomicUsize::new(sst_id), state_lock: Mutex::new(()), block_cache: Arc::new(BlockCache::new(1 << 20)), // 4gb cache state: RwLock::new(Arc::new(StorageState { - memtable: Arc::new(Memtable::default()), + memtable: Arc::new(Memtable::new(sst_id)), frozen_memtables: Vec::new(), - l0_sstables: vec![], - sstables: HashMap::new(), + l0_sstables, + sstables, })), } } @@ -74,7 +93,7 @@ impl Storage { let mut res = state.memtable.get(key); - // search through frozen memtables + // search in frozen memtables if res.is_none() { for frozen_table in state.frozen_memtables.clone() { if frozen_table.get(key).is_some() { @@ -84,11 +103,15 @@ impl Storage { } } - // search through ssts + // search in ssts if res.is_none() { let mut table_iters = Vec::with_capacity(state.l0_sstables.len()); for table_id in state.l0_sstables.iter() { let table = state.sstables[table_id].clone(); + if key < table.first_key() || key > table.last_key() { + continue; + } + let iter = SSTableIterator::create_and_seek_to_key(table, key).unwrap(); table_iters.push(iter); } @@ -165,8 +188,11 @@ impl Storage { let mut frozen_memtables = guard.frozen_memtables.clone(); frozen_memtables.insert(0, memtable); + self.inc_sst_id(); + let id = self.sst_id.load(SeqCst); + *guard = Arc::new(StorageState { - memtable: Arc::new(Memtable::default()), + memtable: Arc::new(Memtable::new(id)), frozen_memtables, l0_sstables: guard.l0_sstables.clone(), sstables: guard.sstables.clone(), @@ -175,15 +201,64 @@ impl Storage { drop(guard); } } + + fn flush_frozen_memtable(&self) -> Result<()> { + let mut sst_builder = SSTableBuilder::new(self.config.block_size); + let mut guard = self.state.write().unwrap(); + + let mut memtables = guard.frozen_memtables.clone(); + let mut l0_sstables = guard.l0_sstables.clone(); + let mut sstables = guard.sstables.clone(); + + let Some(memtable) = memtables.pop() else { + return Ok(()); + }; + memtable.flush(&mut sst_builder)?; + + let sst = sst_builder.build( + memtable.id, + self.block_cache.clone(), + self.sst_path(memtable.id), + )?; + l0_sstables.push(memtable.id); + sstables.insert(memtable.id, Arc::new(sst)); + + *guard = Arc::new(StorageState { + memtable: guard.memtable.clone(), + frozen_memtables: memtables, + l0_sstables, + sstables, + }); + + Ok(()) + } + + fn inc_sst_id(&self) -> usize { + self.sst_id.fetch_add(1, SeqCst) + } + + fn sst_path(&self, id: usize) -> String { + format!("{}/sst/{}.sst", self.config.db_dir, id) + } } #[cfg(test)] mod tests { + use std::str::from_utf8; + + use tempfile::tempdir; + + use crate::lsm_util::{self, get_entries}; + use super::*; #[test] fn filled_up_memtables_are_frozen() { - let config = Config { sst_size: 4 }; + let config = Config { + sst_size: 4, + block_size: 32, + db_dir: String::from(tempdir().unwrap().path().to_str().unwrap()), + }; let storage = new(config); let input = vec![b"1", b"2", b"3", b"4", b"5"]; @@ -194,4 +269,154 @@ mod tests { assert_eq!(2, storage.state.read().unwrap().frozen_memtables.len()); assert_eq!(2, storage.state.read().unwrap().memtable.get_size()); } + + #[test] + fn can_flush_frozen_memtable() { + let config = Config { + sst_size: 4, + block_size: 32, + db_dir: String::from(tempdir().unwrap().path().to_str().unwrap()), + }; + let storage = new(config); + + let input = vec![b"1", b"2", b"3", b"4", b"5"]; + for entry in input { + storage.put(entry, entry).unwrap(); + } + + assert_eq!(2, storage.state.read().unwrap().frozen_memtables.len()); + assert_eq!(2, storage.state.read().unwrap().memtable.get_size()); + + storage + .flush_frozen_memtable() + .expect("memtable was frozen"); + assert_eq!(1, storage.state.read().unwrap().frozen_memtables.len()); + } + + #[test] + fn loads_sstables() { + let db_dir = String::from(tempdir().unwrap().path().to_str().unwrap()); + let config = Config { + sst_size: 4, + block_size: 32, + db_dir: db_dir.clone(), + }; + let storage = new(config); + + let input = vec![b"1", b"2", b"3", b"4", b"5"]; + for entry in input { + storage.put(entry, entry).unwrap(); + } + storage + .flush_frozen_memtable() + .expect("memtable to have been frozen"); + + // new storage instance + let config = Config { + sst_size: 4, + block_size: 32, + db_dir: db_dir.clone(), + }; + + let storage = new(config); + assert_eq!(1, storage.state.read().unwrap().l0_sstables.len()); + assert_eq!(0, storage.state.read().unwrap().l0_sstables[0]); + } + + #[test] + fn scans_storage_with_empty_memtables_and_filled_sstables() { + let db_dir = String::from(tempdir().unwrap().path().to_str().unwrap()); + let config = Config { + sst_size: 4, + block_size: 32, + db_dir: db_dir.clone(), + }; + let storage = new(config); + + for (key, value) in get_entries() { + storage.put(key, value).unwrap(); + } + storage + .flush_frozen_memtable() + .expect("memtable to have been frozen"); + + // new storage instance + let config = Config { + sst_size: 4, + block_size: 32, + db_dir: db_dir.clone(), + }; + + let storage = new(config); + let mut iter = storage.scan(Bound::Unbounded, Bound::Unbounded).unwrap(); + let mut res = vec![]; + + while iter.is_valid() { + res.push(iter.key().to_vec()); + let _ = iter.next(); + } + + assert_eq!(res, vec![b"a", b"b"]); + } + + #[test] + fn scans_through_filled_memtables_and_sstables() { + let db_dir = String::from(tempdir().unwrap().path().to_str().unwrap()); + let config = Config { + sst_size: 4, + block_size: 32, + db_dir: db_dir.clone(), + }; + let storage = new(config); + + for (key, value) in get_entries() { + storage.put(key, value).unwrap(); + } + + // will create sstables with a, b, c, d, e & f + storage + .flush_frozen_memtable() + .expect("memtable to have been frozen"); + storage + .flush_frozen_memtable() + .expect("memtable to have been frozen"); + storage + .flush_frozen_memtable() + .expect("memtable to have been frozen"); + + // new storage instance + let config = Config { + sst_size: 4, + block_size: 32, + db_dir: db_dir.clone(), + }; + + let new_entries = vec![(b"a", b"20"), (b"e", b"21"), (b"d", b"22"), (b"b", b"23")]; + let storage = new(config); + for (key, value) in new_entries { + let _ = storage.put(key, value); + } + + let mut iter = storage.scan(Bound::Unbounded, Bound::Unbounded).unwrap(); + let mut keys = vec![]; + let mut values = vec![]; + + while iter.is_valid() { + println!( + "key: {:?}, value: {:?}", + from_utf8(iter.key()).unwrap(), + from_utf8(iter.value()).unwrap() + ); + let k = from_utf8(iter.key()).unwrap(); + let v = from_utf8(iter.value()).unwrap(); + + keys.push(String::from(k)); + values.push(String::from(v)); + + let _ = iter.next(); + } + + assert_eq!(keys, vec!["a", "b", "c", "d", "e", "f"]); + assert_eq!(values, vec!["20", "23", "3", "22", "21", "6"]); + } } diff --git a/storage/src/lsm_util.rs b/storage/src/lsm_util.rs new file mode 100644 index 0000000..d6507e9 --- /dev/null +++ b/storage/src/lsm_util.rs @@ -0,0 +1,61 @@ +use std::{collections::HashMap, fs, path::Path, sync::Arc}; + +use anyhow::{Result, anyhow}; + +use crate::{FileObject, SSTable, sst::BlockCache}; + +pub(crate) fn load_sstables( + path: &Path, + block_cache: Arc, +) -> Result<(Vec, HashMap>)> { + let mut l0_sstables = vec![]; + let mut sstables = HashMap::new(); + + for entry in fs::read_dir(path.join("sst")).unwrap() { + match entry { + Ok(dir_entry) => { + let sst_path = dir_entry.path(); + + let split_path = sst_path + .file_name() + .unwrap() + .to_str() + .unwrap() + .split(".") + .collect::>(); + + let sst_id = split_path.first().unwrap().parse().unwrap(); + let file = FileObject::open(sst_path.as_path()).expect("failed to open file"); + let sst = SSTable::open(sst_id, block_cache.clone(), file) + .expect("failed to open sstable"); + + l0_sstables.push(sst.id); + sstables.insert(sst.id, Arc::new(sst)); + } + Err(err) => return Err(anyhow!("{:?}", err)), + } + } + + anyhow::Ok((l0_sstables, sstables)) +} + +pub(crate) fn create_db_dir(path: &Path) { + fs::create_dir_all(path.join("sst")).expect("failed to create db dir"); +} + +pub fn get_entries() -> Vec<(&'static [u8], &'static [u8])> { + vec![ + (b"a", b"1"), + (b"b", b"2"), + (b"c", b"3"), + (b"d", b"4"), + (b"e", b"5"), + (b"f", b"6"), + (b"g", b"7"), + (b"h", b"8"), + (b"i", b"9"), + (b"j", b"10"), + (b"k", b"11"), + (b"l", b"12"), + ] +} diff --git a/storage/src/memtable/table.rs b/storage/src/memtable/table.rs index ad7b335..3f710a1 100644 --- a/storage/src/memtable/table.rs +++ b/storage/src/memtable/table.rs @@ -10,9 +10,23 @@ use anyhow::Result; use bytes::Bytes; use crossbeam_skiplist::SkipMap; -use crate::memtable::memtable_iterator::MemtableIterator; +use crate::{SSTableBuilder, memtable::memtable_iterator::MemtableIterator}; + +#[derive(Debug, Clone)] +pub struct Memtable { + pub(crate) size: Arc, + pub(crate) id: usize, + skip_map: Arc>, +} impl Memtable { + pub fn new(id: usize) -> Self { + Self { + skip_map: Arc::new(SkipMap::new()), + size: Arc::new(AtomicUsize::new(0)), + id, + } + } pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> { self.skip_map .insert(Bytes::copy_from_slice(key), Bytes::copy_from_slice(value)); @@ -34,21 +48,18 @@ impl Memtable { pub fn scan(&self, lower: Bound<&[u8]>, upper: Bound<&[u8]>) -> MemtableIterator { MemtableIterator::create(self.skip_map.clone(), lower, upper) } -} -impl Default for Memtable { - fn default() -> Self { - Memtable { - skip_map: Arc::new(SkipMap::new()), - size: Arc::new(AtomicUsize::new(0)), + pub fn flush(&self, builder: &mut SSTableBuilder) -> Result<()> { + for entry in self.skip_map.iter() { + if entry.value().is_empty() { + continue; + } + + builder.add(entry.key(), entry.value()); } - } -} -#[derive(Debug, Clone)] -pub struct Memtable { - pub(crate) size: Arc, - skip_map: Arc>, + Ok(()) + } } #[cfg(test)] @@ -57,7 +68,7 @@ mod tests { #[test] fn can_put_and_get_items() { - let memtable = Memtable::default(); + let memtable = Memtable::new(1); let _ = memtable.put(b"1", b"2"); let out = &memtable.get(b"1").unwrap()[..]; @@ -66,7 +77,7 @@ mod tests { #[test] fn memtable_grows_in_size_after_put() { - let memtable = Memtable::default(); + let memtable = Memtable::new(1); let _ = memtable.put(b"1", b"2"); assert_eq!(2, memtable.get_size()); @@ -75,7 +86,7 @@ mod tests { #[test] #[should_panic] fn key_not_found() { - let memtable = Memtable::default(); + let memtable = Memtable::new(1); let _ = memtable.put(b"1", b"2"); let out = &memtable.get(b"5").unwrap()[..]; diff --git a/storage/tests/storage.rs b/storage/tests/storage.rs index dd3cf96..4e5c25a 100644 --- a/storage/tests/storage.rs +++ b/storage/tests/storage.rs @@ -1,10 +1,19 @@ use cabin_storage::Config; use cabin_storage::common::iterator::StorageIterator; use std::ops::Bound::Unbounded; +use tempfile::tempdir; + +use crate::util::file::get_temp_dir; + +mod util; #[test] fn get_returns_latest_entry() { - let config = Config { sst_size: 2 }; + let config = Config { + sst_size: 2, + block_size: 32, + db_dir: get_temp_dir(), + }; let storage = cabin_storage::new(config); let entries = vec![ (b"age", b"20"), @@ -23,7 +32,11 @@ fn get_returns_latest_entry() { #[test] fn can_read_frozen_memtable() { - let config = Config { sst_size: 2 }; + let config = Config { + sst_size: 2, + block_size: 32, + db_dir: get_temp_dir(), + }; let storage = cabin_storage::new(config); let entries = vec![(b"1", b"20"), (b"2", b"21"), (b"3", b"22"), (b"4", b"23")]; @@ -38,7 +51,11 @@ fn can_read_frozen_memtable() { #[test] #[should_panic] fn get_invalid_key() { - let config = Config { sst_size: 2 }; + let config = Config { + sst_size: 2, + block_size: 32, + db_dir: String::from(tempdir().unwrap().path().to_str().unwrap()), + }; let storage = cabin_storage::new(config); storage.get(b"1").unwrap(); @@ -46,7 +63,11 @@ fn get_invalid_key() { #[test] fn scan_items() { - let config = Config { sst_size: 10 }; + let config = Config { + sst_size: 10, + block_size: 32, + db_dir: get_temp_dir(), + }; let storage = cabin_storage::new(config); let entries = vec![ (b"e", b"4"), diff --git a/storage/tests/two_merge_iter.rs b/storage/tests/two_merge_iter.rs index 07b9e1f..66f4df8 100644 --- a/storage/tests/two_merge_iter.rs +++ b/storage/tests/two_merge_iter.rs @@ -100,7 +100,7 @@ fn create_iterators(items: Vec>) -> Vec String { + String::from(tempdir().unwrap().path().to_str().unwrap()) +} diff --git a/storage/tests/util/mod.rs b/storage/tests/util/mod.rs new file mode 100644 index 0000000..2e172cd --- /dev/null +++ b/storage/tests/util/mod.rs @@ -0,0 +1 @@ +pub mod file;