Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,6 @@ target
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/

# cabin artifacts
.cabin
2 changes: 1 addition & 1 deletion storage/src/iterators/merged_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ mod test {
let mut res = vec![];

for item in items {
let memtable = Memtable::default();
let memtable = Memtable::new(0);

for (key, val) in item {
let _ = memtable.put(key, val);
Expand Down
1 change: 1 addition & 0 deletions storage/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub mod common;
pub mod iterators;
pub mod lsm_storage;
mod lsm_util;
pub mod memtable;

mod block;
Expand Down
243 changes: 234 additions & 9 deletions storage/src/lsm_storage.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
use std::{
collections::HashMap,
ops::Bound,
sync::{Arc, Mutex, MutexGuard, RwLock},
path::Path,
sync::{
Arc, Mutex, MutexGuard, RwLock,
atomic::{AtomicUsize, Ordering::SeqCst},
},
};

use crate::{
SSTable, SSTableIterator,
SSTable, SSTableBuilder, SSTableIterator,
common::{errors::KeyNotFound, iterator::StorageIterator},
iterators::{
lsm_iterator::LsmIterator, merged_iterator::MergedIterator,
two_merge_iterator::TwoMergeIterator,
},
lsm_util::{create_db_dir, load_sstables},
memtable::{memtable_iterator::map_bound, table::Memtable},
sst::BlockCache,
};
Expand All @@ -22,6 +27,7 @@ pub struct Storage {
state: RwLock<Arc<StorageState>>,
block_cache: Arc<BlockCache>,
state_lock: Mutex<()>,
sst_id: AtomicUsize,
config: Config,
}

Expand All @@ -36,18 +42,31 @@ struct StorageState {
#[derive(Debug)]
pub struct Config {
pub sst_size: usize,
pub block_size: usize,
pub db_dir: String,
}

pub fn new(config: Config) -> Storage {
let db_dir = Path::new(&config.db_dir);
create_db_dir(db_dir);
let block_cache = Arc::new(BlockCache::new(1 << 20)); // 4gb
let (l0_sstables, sstables) = load_sstables(db_dir, block_cache).expect("loaded sstables");

let sst_id = match l0_sstables.iter().max() {
Some(id) => id + 1,
None => 0,
};

Storage {
config,
sst_id: AtomicUsize::new(sst_id),
state_lock: Mutex::new(()),
block_cache: Arc::new(BlockCache::new(1 << 20)), // 4gb cache
state: RwLock::new(Arc::new(StorageState {
memtable: Arc::new(Memtable::default()),
memtable: Arc::new(Memtable::new(sst_id)),
frozen_memtables: Vec::new(),
l0_sstables: vec![],
sstables: HashMap::new(),
l0_sstables,
sstables,
})),
}
}
Expand All @@ -74,7 +93,7 @@ impl Storage {

let mut res = state.memtable.get(key);

// search through frozen memtables
// search in frozen memtables
if res.is_none() {
for frozen_table in state.frozen_memtables.clone() {
if frozen_table.get(key).is_some() {
Expand All @@ -84,11 +103,15 @@ impl Storage {
}
}

// search through ssts
// search in ssts
if res.is_none() {
let mut table_iters = Vec::with_capacity(state.l0_sstables.len());
for table_id in state.l0_sstables.iter() {
let table = state.sstables[table_id].clone();
if key < table.first_key() || key > table.last_key() {
continue;
}

let iter = SSTableIterator::create_and_seek_to_key(table, key).unwrap();
table_iters.push(iter);
}
Expand Down Expand Up @@ -165,8 +188,11 @@ impl Storage {
let mut frozen_memtables = guard.frozen_memtables.clone();
frozen_memtables.insert(0, memtable);

self.inc_sst_id();
let id = self.sst_id.load(SeqCst);

*guard = Arc::new(StorageState {
memtable: Arc::new(Memtable::default()),
memtable: Arc::new(Memtable::new(id)),
frozen_memtables,
l0_sstables: guard.l0_sstables.clone(),
sstables: guard.sstables.clone(),
Expand All @@ -175,15 +201,64 @@ impl Storage {
drop(guard);
}
}

fn flush_frozen_memtable(&self) -> Result<()> {
let mut sst_builder = SSTableBuilder::new(self.config.block_size);
let mut guard = self.state.write().unwrap();

let mut memtables = guard.frozen_memtables.clone();
let mut l0_sstables = guard.l0_sstables.clone();
let mut sstables = guard.sstables.clone();

let Some(memtable) = memtables.pop() else {
return Ok(());
};
memtable.flush(&mut sst_builder)?;

let sst = sst_builder.build(
memtable.id,
self.block_cache.clone(),
self.sst_path(memtable.id),
)?;
l0_sstables.push(memtable.id);
sstables.insert(memtable.id, Arc::new(sst));

*guard = Arc::new(StorageState {
memtable: guard.memtable.clone(),
frozen_memtables: memtables,
l0_sstables,
sstables,
});

Ok(())
}

fn inc_sst_id(&self) -> usize {
self.sst_id.fetch_add(1, SeqCst)
}

fn sst_path(&self, id: usize) -> String {
format!("{}/sst/{}.sst", self.config.db_dir, id)
}
}

#[cfg(test)]
mod tests {
use std::str::from_utf8;

use tempfile::tempdir;

use crate::lsm_util::{self, get_entries};

use super::*;

#[test]
fn filled_up_memtables_are_frozen() {
let config = Config { sst_size: 4 };
let config = Config {
sst_size: 4,
block_size: 32,
db_dir: String::from(tempdir().unwrap().path().to_str().unwrap()),
};
let storage = new(config);

let input = vec![b"1", b"2", b"3", b"4", b"5"];
Expand All @@ -194,4 +269,154 @@ mod tests {
assert_eq!(2, storage.state.read().unwrap().frozen_memtables.len());
assert_eq!(2, storage.state.read().unwrap().memtable.get_size());
}

#[test]
fn can_flush_frozen_memtable() {
let config = Config {
sst_size: 4,
block_size: 32,
db_dir: String::from(tempdir().unwrap().path().to_str().unwrap()),
};
let storage = new(config);

let input = vec![b"1", b"2", b"3", b"4", b"5"];
for entry in input {
storage.put(entry, entry).unwrap();
}

assert_eq!(2, storage.state.read().unwrap().frozen_memtables.len());
assert_eq!(2, storage.state.read().unwrap().memtable.get_size());

storage
.flush_frozen_memtable()
.expect("memtable was frozen");
assert_eq!(1, storage.state.read().unwrap().frozen_memtables.len());
}

#[test]
fn loads_sstables() {
let db_dir = String::from(tempdir().unwrap().path().to_str().unwrap());
let config = Config {
sst_size: 4,
block_size: 32,
db_dir: db_dir.clone(),
};
let storage = new(config);

let input = vec![b"1", b"2", b"3", b"4", b"5"];
for entry in input {
storage.put(entry, entry).unwrap();
}
storage
.flush_frozen_memtable()
.expect("memtable to have been frozen");

// new storage instance
let config = Config {
sst_size: 4,
block_size: 32,
db_dir: db_dir.clone(),
};

let storage = new(config);
assert_eq!(1, storage.state.read().unwrap().l0_sstables.len());
assert_eq!(0, storage.state.read().unwrap().l0_sstables[0]);
}

#[test]
fn scans_storage_with_empty_memtables_and_filled_sstables() {
let db_dir = String::from(tempdir().unwrap().path().to_str().unwrap());
let config = Config {
sst_size: 4,
block_size: 32,
db_dir: db_dir.clone(),
};
let storage = new(config);

for (key, value) in get_entries() {
storage.put(key, value).unwrap();
}
storage
.flush_frozen_memtable()
.expect("memtable to have been frozen");

// new storage instance
let config = Config {
sst_size: 4,
block_size: 32,
db_dir: db_dir.clone(),
};

let storage = new(config);
let mut iter = storage.scan(Bound::Unbounded, Bound::Unbounded).unwrap();
let mut res = vec![];

while iter.is_valid() {
res.push(iter.key().to_vec());
let _ = iter.next();
}

assert_eq!(res, vec![b"a", b"b"]);
}

#[test]
fn scans_through_filled_memtables_and_sstables() {
let db_dir = String::from(tempdir().unwrap().path().to_str().unwrap());
let config = Config {
sst_size: 4,
block_size: 32,
db_dir: db_dir.clone(),
};
let storage = new(config);

for (key, value) in get_entries() {
storage.put(key, value).unwrap();
}

// will create sstables with a, b, c, d, e & f
storage
.flush_frozen_memtable()
.expect("memtable to have been frozen");
storage
.flush_frozen_memtable()
.expect("memtable to have been frozen");
storage
.flush_frozen_memtable()
.expect("memtable to have been frozen");

// new storage instance
let config = Config {
sst_size: 4,
block_size: 32,
db_dir: db_dir.clone(),
};

let new_entries = vec![(b"a", b"20"), (b"e", b"21"), (b"d", b"22"), (b"b", b"23")];
let storage = new(config);
for (key, value) in new_entries {
let _ = storage.put(key, value);
}

let mut iter = storage.scan(Bound::Unbounded, Bound::Unbounded).unwrap();
let mut keys = vec![];
let mut values = vec![];

while iter.is_valid() {
println!(
"key: {:?}, value: {:?}",
from_utf8(iter.key()).unwrap(),
from_utf8(iter.value()).unwrap()
);
let k = from_utf8(iter.key()).unwrap();
let v = from_utf8(iter.value()).unwrap();

keys.push(String::from(k));
values.push(String::from(v));

let _ = iter.next();
}

assert_eq!(keys, vec!["a", "b", "c", "d", "e", "f"]);
assert_eq!(values, vec!["20", "23", "3", "22", "21", "6"]);
}
}
Loading
Loading