diff --git a/Cargo.lock b/Cargo.lock index 9b40d288..ac6a61fa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1417,14 +1417,13 @@ dependencies = [ [[package]] name = "librocksdb-sys" -version = "0.16.0+8.10.0" +version = "0.17.1+9.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce3d60bc059831dc1c83903fb45c103f75db65c5a7bf22272764d9cc683e348c" +checksum = "2b7869a512ae9982f4d46ba482c2a304f1efd80c6412a3d4bf57bb79a619679f" dependencies = [ "bindgen 0.69.5", "bzip2-sys", "cc", - "glob", "libc", "libz-sys", "lz4-sys", @@ -2176,9 +2175,9 @@ dependencies = [ [[package]] name = "rocksdb" -version = "0.22.0" +version = "0.23.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6bd13e55d6d7b8cd0ea569161127567cd587676c99f4472f779a0279aa60a7a7" +checksum = "26ec73b20525cb235bad420f911473b69f9fe27cc856c5461bccd7e4af037f43" dependencies = [ "libc", "librocksdb-sys", diff --git a/Cargo.toml b/Cargo.toml index 3cdd62c8..bc8b854b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,7 +51,7 @@ parking_lot = { version = "0.12", features = ["arc_lock"] } petgraph = { version = "0.6" } recursive = { version = "0.1" } regex = { version = "1" } -rocksdb = { version = "0.22" } +rocksdb = { version = "0.23" } rust_decimal = { version = "1" } serde = { version = "1", features = ["derive", "rc"] } kite_sql_serde_macros = { version = "0.1.0", path = "kite_sql_serde_macros" } diff --git a/docs/features.md b/docs/features.md index 49d859f7..9fc4aae0 100644 --- a/docs/features.md +++ b/docs/features.md @@ -70,6 +70,7 @@ let kite_sql = DataBaseBuilder::path("./data") - Volcano ### MVCC Transaction +- Pessimistic (Default) - Optimistic ### Field options diff --git a/examples/transaction.rs b/examples/transaction.rs index c5f3d908..ef2834b9 100644 --- a/examples/transaction.rs +++ b/examples/transaction.rs @@ -2,7 +2,7 @@ use kite_sql::db::{DataBaseBuilder, ResultIter}; use kite_sql::errors::DatabaseError; fn main() -> Result<(), DatabaseError> { - let database = DataBaseBuilder::path("./transaction").build()?; + let database = DataBaseBuilder::path("./transaction").build_optimistic()?; let mut transaction = database.new_transaction()?; transaction diff --git a/src/db.rs b/src/db.rs index ff08a4fa..4b0a3359 100644 --- a/src/db.rs +++ b/src/db.rs @@ -17,7 +17,7 @@ use crate::optimizer::rule::implementation::ImplementationRuleImpl; use crate::optimizer::rule::normalization::NormalizationRuleImpl; use crate::parser::parse_sql; use crate::planner::LogicalPlan; -use crate::storage::rocksdb::RocksStorage; +use crate::storage::rocksdb::{OptimisticRocksStorage, RocksStorage}; use crate::storage::{StatisticsMetaCache, Storage, TableCache, Transaction, ViewCache}; use crate::types::tuple::{SchemaRef, Tuple}; use crate::types::value::DataValue; @@ -86,6 +86,21 @@ impl DataBaseBuilder { pub fn build(self) -> Result, DatabaseError> { let storage = RocksStorage::new(self.path)?; + + Self::_build::(storage, self.scala_functions, self.table_functions) + } + + pub fn build_optimistic(self) -> Result, DatabaseError> { + let storage = OptimisticRocksStorage::new(self.path)?; + + Self::_build::(storage, self.scala_functions, self.table_functions) + } + + fn _build( + storage: T, + scala_functions: ScalaFunctions, + table_functions: TableFunctions, + ) -> Result, DatabaseError> { let meta_cache = SharedLruCache::new(256, 8, RandomState::new())?; let table_cache = SharedLruCache::new(48, 4, RandomState::new())?; let view_cache = SharedLruCache::new(12, 4, RandomState::new())?; @@ -94,8 +109,8 @@ impl DataBaseBuilder { storage, mdl: Default::default(), state: Arc::new(State { - scala_functions: self.scala_functions, - table_functions: self.table_functions, + scala_functions, + table_functions, meta_cache, table_cache, view_cache, @@ -662,6 +677,53 @@ pub(crate) mod test { tx_1.run("insert into t1 values(0, 0)")?.done()?; tx_1.run("insert into t1 values(1, 1)")?.done()?; + assert!(tx_2.run("insert into t1 values(0, 0)")?.done().is_err()); + tx_2.run("insert into t1 values(3, 3)")?.done()?; + + let mut iter_1 = tx_1.run("select * from t1")?; + let mut iter_2 = tx_2.run("select * from t1")?; + + assert_eq!( + iter_1.next().unwrap()?.values, + vec![DataValue::Int32(0), DataValue::Int32(0)] + ); + assert_eq!( + iter_1.next().unwrap()?.values, + vec![DataValue::Int32(1), DataValue::Int32(1)] + ); + + assert_eq!( + iter_2.next().unwrap()?.values, + vec![DataValue::Int32(3), DataValue::Int32(3)] + ); + drop(iter_1); + drop(iter_2); + + tx_1.commit()?; + tx_2.commit()?; + + let mut tx_3 = kite_sql.new_transaction()?; + let res = tx_3.run("create table t2 (a int primary key, b int)"); + assert!(res.is_err()); + + Ok(()) + } + + #[test] + fn test_optimistic_transaction_sql() -> Result<(), DatabaseError> { + let temp_dir = TempDir::new().expect("unable to create temporary working directory"); + let kite_sql = DataBaseBuilder::path(temp_dir.path()).build_optimistic()?; + + kite_sql + .run("create table t1 (a int primary key, b int)")? + .done()?; + + let mut tx_1 = kite_sql.new_transaction()?; + let mut tx_2 = kite_sql.new_transaction()?; + + tx_1.run("insert into t1 values(0, 0)")?.done()?; + tx_1.run("insert into t1 values(1, 1)")?.done()?; + tx_2.run("insert into t1 values(0, 0)")?.done()?; tx_2.run("insert into t1 values(3, 3)")?.done()?; diff --git a/src/storage/rocksdb.rs b/src/storage/rocksdb.rs index d3ac6e6b..770e85f3 100644 --- a/src/storage/rocksdb.rs +++ b/src/storage/rocksdb.rs @@ -2,29 +2,37 @@ use crate::errors::DatabaseError; use crate::storage::table_codec::{BumpBytes, Bytes, TableCodec}; use crate::storage::{InnerIter, Storage, Transaction}; use rocksdb::{ - DBIteratorWithThreadMode, Direction, IteratorMode, OptimisticTransactionDB, SliceTransform, + DBIteratorWithThreadMode, Direction, IteratorMode, OptimisticTransactionDB, Options, + SliceTransform, TransactionDB, }; use std::collections::Bound; use std::path::PathBuf; use std::sync::Arc; #[derive(Clone)] -pub struct RocksStorage { +pub struct OptimisticRocksStorage { pub inner: Arc, } -impl RocksStorage { +impl OptimisticRocksStorage { pub fn new(path: impl Into + Send) -> Result { - let mut bb = rocksdb::BlockBasedOptions::default(); - bb.set_block_cache(&rocksdb::Cache::new_lru_cache(40 * 1_024 * 1_024)); - bb.set_whole_key_filtering(false); + let storage = OptimisticTransactionDB::open(&default_opts(), path.into())?; - let mut opts = rocksdb::Options::default(); - opts.set_block_based_table_factory(&bb); - opts.create_if_missing(true); - opts.set_prefix_extractor(SliceTransform::create_fixed_prefix(4)); + Ok(OptimisticRocksStorage { + inner: Arc::new(storage), + }) + } +} - let storage = OptimisticTransactionDB::open(&opts, path.into())?; +#[derive(Clone)] +pub struct RocksStorage { + pub inner: Arc, +} + +impl RocksStorage { + pub fn new(path: impl Into + Send) -> Result { + let txn_opts = rocksdb::TransactionDBOptions::default(); + let storage = TransactionDB::open(&default_opts(), &txn_opts, path.into())?; Ok(RocksStorage { inner: Arc::new(storage), @@ -32,6 +40,32 @@ impl RocksStorage { } } +fn default_opts() -> Options { + let mut bb = rocksdb::BlockBasedOptions::default(); + bb.set_block_cache(&rocksdb::Cache::new_lru_cache(40 * 1_024 * 1_024)); + bb.set_whole_key_filtering(false); + + let mut opts = rocksdb::Options::default(); + opts.set_block_based_table_factory(&bb); + opts.create_if_missing(true); + opts.set_prefix_extractor(SliceTransform::create_fixed_prefix(4)); + opts +} + +impl Storage for OptimisticRocksStorage { + type TransactionType<'a> + = OptimisticRocksTransaction<'a> + where + Self: 'a; + + fn transaction(&self) -> Result, DatabaseError> { + Ok(OptimisticRocksTransaction { + tx: self.inner.transaction(), + table_codec: Default::default(), + }) + } +} + impl Storage for RocksStorage { type TransactionType<'a> = RocksTransaction<'a> @@ -46,112 +80,147 @@ impl Storage for RocksStorage { } } -pub struct RocksTransaction<'db> { +pub struct OptimisticRocksTransaction<'db> { tx: rocksdb::Transaction<'db, OptimisticTransactionDB>, table_codec: TableCodec, } -impl<'txn> Transaction for RocksTransaction<'txn> { - type IterType<'iter> - = RocksIter<'txn, 'iter> - where - Self: 'iter; +pub struct RocksTransaction<'db> { + tx: rocksdb::Transaction<'db, TransactionDB>, + table_codec: TableCodec, +} - #[inline] - fn table_codec(&self) -> *const TableCodec { - &self.table_codec - } +#[macro_export] +macro_rules! impl_transaction { + ($tx:ident, $iter:ident) => { + impl<'txn> Transaction for $tx<'txn> { + type IterType<'iter> + = $iter<'txn, 'iter> + where + Self: 'iter; + + #[inline] + fn table_codec(&self) -> *const TableCodec { + &self.table_codec + } - #[inline] - fn get(&self, key: &[u8]) -> Result, DatabaseError> { - Ok(self.tx.get(key)?) - } + #[inline] + fn get(&self, key: &[u8]) -> Result, DatabaseError> { + Ok(self.tx.get(key)?) + } - #[inline] - fn set(&mut self, key: BumpBytes, value: BumpBytes) -> Result<(), DatabaseError> { - self.tx.put(key, value)?; + #[inline] + fn set(&mut self, key: BumpBytes, value: BumpBytes) -> Result<(), DatabaseError> { + self.tx.put(key, value)?; - Ok(()) - } + Ok(()) + } - #[inline] - fn remove(&mut self, key: &[u8]) -> Result<(), DatabaseError> { - self.tx.delete(key)?; + #[inline] + fn remove(&mut self, key: &[u8]) -> Result<(), DatabaseError> { + self.tx.delete(key)?; - Ok(()) - } + Ok(()) + } - // Tips: rocksdb has weak support for `Include` and `Exclude`, so precision will be lost - #[inline] - fn range<'a>( - &'a self, - min: Bound>, - max: Bound>, - ) -> Result, DatabaseError> { - let min = match min { - Bound::Included(bytes) => Some(bytes), - Bound::Excluded(mut bytes) => { - // the prefix is the same, but the length is larger - bytes.push(0u8); - Some(bytes) + // Tips: rocksdb has weak support for `Include` and `Exclude`, so precision will be lost + #[inline] + fn range<'a>( + &'a self, + min: Bound>, + max: Bound>, + ) -> Result, DatabaseError> { + let min = match min { + Bound::Included(bytes) => Some(bytes), + Bound::Excluded(mut bytes) => { + // the prefix is the same, but the length is larger + bytes.push(0u8); + Some(bytes) + } + Bound::Unbounded => None, + }; + let lower = min + .as_ref() + .map(|bytes| IteratorMode::From(bytes, Direction::Forward)) + .unwrap_or(IteratorMode::Start); + + if let (Some(min_bytes), Bound::Included(max_bytes) | Bound::Excluded(max_bytes)) = + (&min, &max) + { + let len = min_bytes + .iter() + .zip(max_bytes.iter()) + .take_while(|(x, y)| x == y) + .count(); + + debug_assert!(len > 0); + let mut iter = self.tx.prefix_iterator(&min_bytes[..len]); + iter.set_mode(lower); + + return Ok($iter { upper: max, iter }); + } + let iter = self.tx.iterator(lower); + + Ok($iter { upper: max, iter }) + } + + fn commit(self) -> Result<(), DatabaseError> { + self.tx.commit()?; + Ok(()) } - Bound::Unbounded => None, - }; - let lower = min - .as_ref() - .map(|bytes| IteratorMode::From(bytes, Direction::Forward)) - .unwrap_or(IteratorMode::Start); - - if let (Some(min_bytes), Bound::Included(max_bytes) | Bound::Excluded(max_bytes)) = - (&min, &max) - { - let len = min_bytes - .iter() - .zip(max_bytes.iter()) - .take_while(|(x, y)| x == y) - .count(); - - debug_assert!(len > 0); - let mut iter = self.tx.prefix_iterator(&min_bytes[..len]); - iter.set_mode(lower); - - return Ok(RocksIter { upper: max, iter }); } - let iter = self.tx.iterator(lower); + }; +} - Ok(RocksIter { upper: max, iter }) - } +impl_transaction!(RocksTransaction, RocksIter); +impl_transaction!(OptimisticRocksTransaction, OptimisticRocksIter); - fn commit(self) -> Result<(), DatabaseError> { - self.tx.commit()?; - Ok(()) +pub struct OptimisticRocksIter<'txn, 'iter> { + upper: Bound>, + iter: DBIteratorWithThreadMode<'iter, rocksdb::Transaction<'txn, OptimisticTransactionDB>>, +} + +impl InnerIter for OptimisticRocksIter<'_, '_> { + #[inline] + fn try_next(&mut self) -> Result, DatabaseError> { + if let Some(result) = self.iter.by_ref().next() { + return next(self.upper.as_ref(), result?); + } + Ok(None) } } pub struct RocksIter<'txn, 'iter> { upper: Bound>, - iter: DBIteratorWithThreadMode<'iter, rocksdb::Transaction<'txn, OptimisticTransactionDB>>, + iter: DBIteratorWithThreadMode<'iter, rocksdb::Transaction<'txn, TransactionDB>>, } impl InnerIter for RocksIter<'_, '_> { #[inline] fn try_next(&mut self) -> Result, DatabaseError> { if let Some(result) = self.iter.by_ref().next() { - let (key, value) = result?; - let upper_bound_check = match &self.upper { - Bound::Included(ref upper) => key.as_ref() <= upper.as_slice(), - Bound::Excluded(ref upper) => key.as_ref() < upper.as_slice(), - Bound::Unbounded => true, - }; - if !upper_bound_check { - return Ok(None); - } - return Ok(Some((Vec::from(key), Vec::from(value)))); + return next(self.upper.as_ref(), result?); } Ok(None) } } +#[inline] +fn next( + upper: Bound<&BumpBytes<'_>>, + (key, value): (Box<[u8]>, Box<[u8]>), +) -> Result, DatabaseError> { + let upper_bound_check = match upper { + Bound::Included(upper) => key.as_ref() <= upper.as_slice(), + Bound::Excluded(upper) => key.as_ref() < upper.as_slice(), + Bound::Unbounded => true, + }; + if !upper_bound_check { + return Ok(None); + } + Ok(Some((Vec::from(key), Vec::from(value)))) +} + #[cfg(test)] mod test { use crate::catalog::{ColumnCatalog, ColumnDesc, ColumnRef};