diff --git a/Cargo.toml b/Cargo.toml index 3c8d65f..139afcc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ resolver = "2" -members = ["edfsm", "edfsm-macros", "edfsm-kv-store", "edfsm-machine"] +members = ["edfsm", "edfsm-macros", "edfsm-kv-store", "edfsm-machine", "edfsm-persistance"] [workspace.package] description = "Event Driven Finite State Machine library" @@ -20,6 +20,7 @@ futures-util = "0.3" proc-macro2 = "1" proc-macro-error = "1" quote = "1" +rusqlite = "0.34" serde = "1" serde_json = "1" serde_qs = "0.13" diff --git a/edfsm-persistance/Cargo.toml b/edfsm-persistance/Cargo.toml new file mode 100644 index 0000000..c3ab1aa --- /dev/null +++ b/edfsm-persistance/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "edfsm-persistance" +description = "Store latest events in sqlite" +readme = "README.md" + +version.workspace = true +edition.workspace = true +license.workspace = true +repository.workspace = true + +[dependencies] +derive_more = { workspace = true, features = ["try_into"] } +edfsm.workspace = true +edfsm-machine.workspace = true +rusqlite.workspace = true +serde.workspace = true +serde_json.workspace = true +tokio = { workspace = true, features = ["full"] } diff --git a/edfsm-persistance/README.md b/edfsm-persistance/README.md new file mode 100644 index 0000000..099b3b0 --- /dev/null +++ b/edfsm-persistance/README.md @@ -0,0 +1,25 @@ +# Store events in sqlite + +This is an alternative to [streambed]() to provide persistance to edfsm state machines. An SQLite database is interfaced directly with `Machine` via its adapter traits. The aim is to leverage many advantages of SQLite. The tradeoff is more limited log compaction options and no interprocess communication. With those limitations, the simplicity of the idea is clear. This proof of concept stands at around 200 loc. The log is just a single file database. + +## How it Works + +Events are stored in two tables. New events are inserted into `log` which has `offset` as an _integer primary key_. SQLite manages this special column and thereby keeps track of the current event offset automatically. It can efficiently present events in offset order without needed an index. + +A second table, `compacted` stores events by their compaction key. Compaction is performed by an SQL statement that copies events in offset order from the `log` to `compacted`. Key conflicts are handled by accepting the most recent log record, which is the event with greatest offset. After compaction the log is trimmed with an SQL delete statement. + +The compaction policy resulting from this is to retain one event with each compaction key. Events are replayed by querying the `log` table followed by events in `compacted` that are older than the last event in `log`. + +## Types and Traits + + A `BackingStore` wraps an SQLite connection and other bookeeping information. It implements the functionality describe so far. An `AsyncBackingStore` wraps a `BackingStore` in a mutex. This type can be passed to a `Machine` to act as a persistent log. It implements the necessary traits, `Adapter` and `Feed`. + + ## Rationale and Alternatives + + The idea of implementing this in streambed was explored and that remains a possibility. The approach would be to implement `CommitLog`. Instead this store implements `Adapter` and `Feed` which are more minimalist traits. An important function of streambed is as an interprocess communication channel similar to _Kafka_. This is not supported here at all and is not an SQLite strength. Many streambed concerns are avoided including topics, groups and the metadata defined in `ProducerRecord`. + + The approach to async operation implemented here is about as primitive as can be. `AsyncBackingStore` simply wraps the database connection in a mutex. There is no buffering or streaming of events. For the most part this seems acceptable. The `Machine` will generall be surrounded by queues. One weakness is that, without streaming, event replay materializes the entire event history in memory. + +## Advantages of SQLite + +SQLite has emerged as a universal persistance solution. We expect an SQLite file to be consistent no matter what and to be readable anywhere up to and maybe after an apocolypse. It is also very fast. diff --git a/edfsm-persistance/src/error.rs b/edfsm-persistance/src/error.rs new file mode 100644 index 0000000..e88a79f --- /dev/null +++ b/edfsm-persistance/src/error.rs @@ -0,0 +1,17 @@ +use derive_more::From; + +/// Result type for this module +pub type Result = core::result::Result; + +/// Error type for this module +#[derive(Debug, From)] +pub enum Error { + Sqlite(rusqlite::Error), + Serde(serde_json::Error), +} + +impl From for edfsm_machine::error::Error { + fn from(_value: Error) -> Self { + edfsm_machine::error::Error::ChannelClosed + } +} diff --git a/edfsm-persistance/src/lib.rs b/edfsm-persistance/src/lib.rs new file mode 100644 index 0000000..8ad02ca --- /dev/null +++ b/edfsm-persistance/src/lib.rs @@ -0,0 +1,243 @@ +pub mod error; +use edfsm_machine::adapter::{Adapter, Feed}; +use edfsm_machine::error as mach_error; +pub use error::Result; +use rusqlite::{Connection, OptionalExtension, Params}; +use serde::{de::DeserializeOwned, Serialize}; +use std::{marker::PhantomData, ops::Range, path::Path, usize}; +use tokio::{sync::Mutex, task::block_in_place}; + +pub trait Persistable +where + Self: Serialize, + Self::Key: Serialize, +{ + /// The type of the compaction key. + type Key; + + /// The compaction key for this event. + fn compaction_key(&self) -> Self::Key; + + // On receipt of this event it and all preceding buffered events should be persisted. + // fn checkpoint(&self) -> bool; +} + +#[derive(Debug)] +pub struct BackingStore { + connection: Connection, + log_range: Range, + last_compact_offset: Option, + log_low_level: usize, + log_high_level: usize, + marker: PhantomData, +} + +impl BackingStore { + pub fn new( + path: impl AsRef, + low_level: usize, + high_level: usize, + ) -> Result> { + // clamp high and low log levels to valid range + let log_low_level = low_level.max(1).min(usize::MAX - 2); + let log_high_level = high_level.max(log_low_level + 2); + + // create or open the database + let connection = Connection::open(path)?; + Self::create_tables(&connection)?; + let log_range = Self::query_log_offsets(&connection)?; + let last_compact_offset = Self::query_compact_offset(&connection)?; + + let store = Self { + connection, + log_range, + last_compact_offset, + log_low_level, + log_high_level, + marker: PhantomData, + }; + + Ok(store) + } + + const INSERT_LOG: &str = "INSERT INTO log (key, value) VALUES (?, ?)"; + + pub fn produce(&mut self, item: A) -> Result<()> + where + A: Persistable, + { + let key = serde_json::to_string(&item.compaction_key())?; + let value = serde_json::to_string(&item)?; + + let mut statement = self.connection.prepare_cached(Self::INSERT_LOG)?; + statement.execute((&*key, &*value))?; + let offset = self.connection.last_insert_rowid(); + drop(statement); + + if self.log_range.is_empty() { + self.log_range.start = offset; + } + self.log_range.end = offset + 1; + + if self.log_range.end - self.log_range.start > self.log_high_level as i64 { + self.compact()?; + } + + Ok(()) + } + + const COMPACT_LOG_TAIL: &str = "INSERT INTO compacted (key, offset, value) + SELECT key, offset, value FROM log ORDER by offset WHERE offset > ?"; + + const COMPACT_LOG_ALL: &str = "INSERT INTO compacted (key, offset, value) + SELECT key, offset, value FROM log ORDER by offset"; + + const TRIM_LOG: &str = "DELETE FROM log where offset < ?"; + + pub fn compact(&mut self) -> Result<()> { + if !self.log_range.is_empty() { + let last_log_offset = self.log_range.end - 1; + + if let Some(offset) = self.last_compact_offset { + if last_log_offset > offset { + let mut statement = self.connection.prepare_cached(Self::COMPACT_LOG_TAIL)?; + statement.execute((offset,))?; + self.last_compact_offset = Some(last_log_offset); + } + } else { + let mut statement = self.connection.prepare_cached(Self::COMPACT_LOG_ALL)?; + statement.execute(())?; + self.last_compact_offset = Some(last_log_offset); + } + + if self.log_range.end - self.log_range.start > self.log_high_level as i64 { + let preserve = self.log_range.end - self.log_low_level as i64; + let mut statement = self.connection.prepare_cached(Self::TRIM_LOG)?; + statement.execute((preserve,))?; + self.log_range.start = preserve; + } + } + Ok(()) + } + + const SELECT_LOG: &str = "SELECT value FROM log ORDER BY offset"; + const SELECT_COMPACT_ALL: &str = "SELECT value FROM compact ORDER BY offset"; + const SELECT_COMPACT_TAIL: &str = "SELECT value FROM compact ORDER BY offset WHERE offset > ?"; + + fn query_events

(&self, sql: &str, params: P, values: &mut Vec) -> Result<()> + where + A: DeserializeOwned, + P: Params, + { + let mut statement: rusqlite::CachedStatement<'_> = self.connection.prepare_cached(sql)?; + let mut rows = statement.query(params)?; + + while let Some(row) = rows.next()? { + let text: String = row.get(0)?; + let item: A = serde_json::from_str(&*text)?; + values.push(item); + } + Ok(()) + } + + pub fn history(&mut self) -> Result> + where + A: DeserializeOwned, + { + let mut values: Vec = Vec::new(); + + if self.log_range.is_empty() { + self.query_events(Self::SELECT_COMPACT_ALL, (), &mut values)?; + } else { + let breakpoint = self.log_range.end - 1; + self.query_events(Self::SELECT_LOG, (), &mut values)?; + self.query_events(Self::SELECT_COMPACT_TAIL, (breakpoint,), &mut values)?; + } + + Ok(values) + } + + const CREATE_LOG: &str = "CREATE TABLE IF NOT EXISTS log ( + offset INTEGER PRIMARY KEY, + key TEXT, + value TEXT + + )"; + + const CREATE_COMPACT: &str = "CREATE TABLE IF NOT EXISTS compacted ( + key TEXT PRIMARY KEY ON CONFLICT REPLACE, + offset INTEGER, + value TEXT + + )"; + + fn create_tables(connection: &Connection) -> Result<()> { + connection.execute(Self::CREATE_LOG, ())?; + connection.execute(Self::CREATE_COMPACT, ())?; + Ok(()) + } + + const QUERY_LOG_OFFSETS: &str = "SELECT MIN(offset), MAX(offset) FROM log"; + + fn query_log_offsets(connection: &Connection) -> Result> { + let mut statement = connection.prepare_cached(Self::QUERY_LOG_OFFSETS)?; + let values = statement + .query_row((), |row| { + let start: i64 = row.get(0)?; + let last: i64 = row.get(1)?; + let end = last + 1; + Ok(Range { start, end }) + }) + .optional()? + .unwrap_or_default(); + Ok(values) + } + + const QUERY_COMPACT_OFFSET: &str = "SELECT MAX(offset) FROM compact"; + + fn query_compact_offset(connection: &Connection) -> Result> { + let mut statement = connection.prepare_cached(Self::QUERY_COMPACT_OFFSET)?; + let offset = statement.query_row((), |row| row.get(0)).optional()?; + Ok(offset) + } +} + +#[derive(Debug)] +pub struct AsyncBackingStore(Mutex>); + +impl AsyncBackingStore { + pub fn new(store: BackingStore) -> Self { + Self(Mutex::new(store)) + } +} + +impl Feed for AsyncBackingStore +where + A: DeserializeOwned + Send + Sync + 'static, +{ + type Item = A; + + async fn feed(&self, sink: &mut impl Adapter) -> mach_error::Result<()> { + let mut store = self.0.lock().await; + let values = block_in_place(|| { + store.compact()?; + store.history() + })?; + for item in values { + sink.notify(item).await; + } + Ok(()) + } +} + +impl Adapter for AsyncBackingStore +where + A: Send + Sync + Persistable, +{ + type Item = A; + + async fn notify(&mut self, item: Self::Item) { + let mut store = self.0.lock().await; + let _ = block_in_place(|| store.produce(item)); + } +}