Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@ tonbo_macros = { version = "0.3.1", path = "tonbo_macros" }
tracing = "0.1"
ulid = { version = "1", features = ["serde"] }

typed-arrow = { version = "0.2.0", features = ["derive"] }
typed-arrow-dyn = { version = "0.0.0-alpha0" }
typed-arrow-unified = { version = "0.0.0-alpha0" }

# Only used for benchmarks
log = "0.4.22"
redb = { version = "2", optional = true }
Expand Down
23 changes: 15 additions & 8 deletions examples/datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@ use fusio::path::Path;
use futures_core::Stream;
use futures_util::StreamExt;
use tokio::fs;
use tonbo::{executor::tokio::TokioExecutor, record::Schema, ArrowArrays, DbOption, DB};
use tonbo_macros::Record;
use tonbo::{
executor::tokio::TokioExecutor, record::Schema, typed as t, ArrowArrays, DbOption, DB,
};

#[derive(Record, Debug)]
#[t::record]
#[derive(Debug)]
pub struct Music {
#[record(primary_key)]
id: u64,
Expand Down Expand Up @@ -75,7 +77,7 @@ impl TableProvider for MusicProvider {
}

fn schema(&self) -> SchemaRef {
MusicSchema {}.arrow_schema().clone()
music_schema().arrow_schema().clone()
}

fn table_type(&self) -> TableType {
Expand Down Expand Up @@ -108,7 +110,7 @@ impl TableProvider for MusicProvider {

impl MusicExec {
fn new(db: Arc<DB<Music, TokioExecutor>>, projection: Option<&Vec<usize>>) -> Self {
let schema = MusicSchema {}.arrow_schema();
let schema = music_schema().arrow_schema();
let schema = if let Some(projection) = &projection {
Arc::new(schema.project(projection).unwrap())
} else {
Expand Down Expand Up @@ -140,7 +142,7 @@ impl Stream for MusicStream {

impl RecordBatchStream for MusicStream {
fn schema(&self) -> SchemaRef {
MusicSchema {}.arrow_schema().clone()
music_schema().arrow_schema().clone()
}
}

Expand Down Expand Up @@ -228,17 +230,18 @@ async fn main() -> Result<()> {
// make sure the path exists
let _ = fs::create_dir_all("./db_path/music").await;

let schema: MusicSchema = Default::default();
let options = DbOption::new(
Path::from_filesystem_path(
fs::canonicalize(PathBuf::from("./db_path/music"))
.await
.unwrap(),
)
.unwrap(),
&MusicSchema,
&schema,
);

let db = DB::new(options, TokioExecutor::default(), MusicSchema)
let db = DB::new(options, TokioExecutor::default(), schema)
.await
.unwrap();
for (id, name, like) in [
Expand Down Expand Up @@ -295,3 +298,7 @@ async fn main() -> Result<()> {

Ok(())
}

fn music_schema() -> MusicSchema {
Default::default()
}
33 changes: 20 additions & 13 deletions examples/declare.rs
Original file line number Diff line number Diff line change
@@ -1,39 +1,40 @@
use std::{ops::Bound, path::PathBuf};

use bytes::Bytes;
use fusio::path::Path;
use futures_util::stream::StreamExt;
use tokio::fs;
use tonbo::{executor::tokio::TokioExecutor, record::F32, DbOption, Projection, Record, DB};
use tonbo::{executor::tokio::TokioExecutor, typed as t, DbOption, Projection, DB};

/// Use macro to define schema of column family just like ORM
/// It provides type-safe read & write API
#[derive(Record, Debug)]
#[t::record]
#[derive(Debug, Default)]
pub struct User {
#[record(primary_key)]
name: String,
email: Option<String>,
age: u8,
bytes: Bytes,
grade: F32,
bytes: Vec<u8>,
grade: f32,
}

#[tokio::main]
async fn main() {
// make sure the path exists
let _ = fs::create_dir_all("./db_path/users").await;

let schema: UserSchema = Default::default();
let options = DbOption::new(
Path::from_filesystem_path(
fs::canonicalize(PathBuf::from("./db_path/users"))
.await
.unwrap(),
)
.unwrap(),
&UserSchema,
&schema,
);
// pluggable async runtime and I/O
let db = DB::new(options, TokioExecutor::default(), UserSchema)
let db = DB::new(options, TokioExecutor::default(), schema)
.await
.unwrap();

Expand All @@ -42,8 +43,8 @@ async fn main() {
name: "Alice".into(),
email: Some("[email protected]".into()),
age: 22,
bytes: Bytes::from(vec![0, 1, 2]),
grade: 96.5.into(),
bytes: vec![0, 1, 2],
grade: 96.5,
})
.await
.unwrap();
Expand All @@ -53,7 +54,9 @@ async fn main() {
let txn = db.transaction().await;

// get from primary key
let name = "Alice".into();
let name = UserKey {
name: "Alice".into(),
};

// get the zero-copy reference of record without any allocations.
let user = txn
Expand All @@ -68,7 +71,9 @@ async fn main() {
assert_eq!(user.unwrap().get().age, Some(22));

{
let upper = "Blob".into();
let upper = UserKey {
name: "Blob".into(),
};
// range scan of user
let mut scan = txn
.scan((Bound::Included(&name), Bound::Excluded(&upper)))
Expand All @@ -87,14 +92,16 @@ async fn main() {
email: Some("[email protected]"),
age: None,
bytes: Some(&[0, 1, 2]),
grade: Some(96.5.into()),
grade: Some(96.5),
})
);
}
}

{
let upper = "Blob".into();
let upper = UserKey {
name: "Blob".into(),
};
// reverse scan of user (descending order)
let mut reverse_scan = txn
.scan((Bound::Included(&name), Bound::Excluded(&upper)))
Expand Down
49 changes: 49 additions & 0 deletions examples/simple_composite.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use tonbo::{executor::tokio::TokioExecutor, typed as t, DbOption, Path, Projection, DB};

#[t::record(key(id, name))]
#[derive(Debug, Default)]
pub struct User {
#[record(primary_key)]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an example of how to use a composite primary key

id: u64,
#[record(primary_key)]
name: String,
age: u8,
}

#[tokio::main]
async fn main() {
// Prepare a local directory for the DB
let base = "/tmp/db_path/people";
let _ = tokio::fs::create_dir_all(base).await;

// Open the DB using the generated `UserSchema`
let schema = UserSchema::default();
let options = DbOption::new(Path::from_filesystem_path(base).unwrap(), &schema);
let db: DB<User, TokioExecutor> = DB::new(options, TokioExecutor::default(), schema)
.await
.unwrap();

db.insert(User {
id: 1,
name: "Alice".into(),
age: 25,
})
.await
.unwrap();
// Get by primary key
let txn = db.transaction().await;
let key = UserKey {
id: 1,
name: "Alice".into(),
};
let user = txn.get(&key, Projection::All).await.unwrap();
assert!(user.is_some());
assert_eq!(
user.unwrap().get(),
UserRef {
id: 1,
name: "Alice",
age: Some(25),
}
);
}
71 changes: 71 additions & 0 deletions examples/typed_pilot.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
use std::ops::Bound;

use fusio::path::Path;
use futures_util::StreamExt;
use tonbo::{executor::tokio::TokioExecutor, typed as t, DbOption, Projection, DB};

#[t::record]
#[derive(Debug, Clone, Default)]
pub struct Person {
#[record(primary_key)]
id: i64,
name: String,
age: Option<i16>,
}

#[tokio::main]
async fn main() {
// Prepare a local directory for the DB
let base = "/tmp/db_path/people";
let _ = tokio::fs::create_dir_all(base).await;

// Open the DB using the generated `PersonSchema`
let schema: PersonSchema = Default::default();
let options = DbOption::new(Path::from_filesystem_path(base).unwrap(), &schema);
let db: DB<Person, TokioExecutor> = DB::new(options, TokioExecutor::default(), schema)
.await
.unwrap();

// Insert a couple of rows
db.insert(Person {
id: 1,
name: "Alice".into(),
age: Some(30),
})
.await
.unwrap();
db.insert(Person {
id: 2,
name: "Bob".into(),
age: None,
})
.await
.unwrap();

// Get by primary key
{
let txn = db.transaction().await;
let got = txn
.get(&PersonKey { id: 1 }, Projection::All)
.await
.unwrap();
println!("get(1): {:?}", got.as_ref().map(|e| e.get()));
}

// Range scan with projection (only `name`)
{
let txn = db.transaction().await;
let mut scan = txn
.scan((Bound::Unbounded, Bound::Unbounded))
.projection(&["name"])
.take()
.await
.unwrap();
while let Some(entry) = scan.next().await.transpose().unwrap() {
println!("scan -> {:?}", entry.value());
}
}

// Remove a row
db.remove(PersonKey { id: 2 }).await.unwrap();
}
2 changes: 1 addition & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[toolchain]
channel = "1.85"
channel = "1.89"
components = ["clippy", "rust-analyzer", "rustfmt"]
2 changes: 1 addition & 1 deletion src/inmem/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ pub(crate) mod tests {
fn arrow_schema(&self) -> &Arc<ArrowSchema> {
static SCHEMA: Lazy<Arc<ArrowSchema>> = Lazy::new(|| {
Arc::new(ArrowSchema::new(vec![
Field::new("_null", DataType::Boolean, false),
Field::new(magic::NULL, DataType::Boolean, false),
Field::new(magic::TS, DataType::UInt32, false),
Field::new("vstring", DataType::Utf8, false),
Field::new("vu32", DataType::UInt32, false),
Expand Down
5 changes: 3 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
//!
//! # Examples
//!
//! ```no_run
//! ```ignore
//! use std::ops::Bound;
//!
//! use fusio::path::Path;
Expand Down Expand Up @@ -121,6 +121,7 @@ pub(crate) mod snapshot;
pub mod stream;
pub mod transaction;
mod trigger;
pub mod typed;
pub mod version;
mod wal;

Expand Down Expand Up @@ -165,7 +166,7 @@ use parquet::{
use parquet_lru::{DynLruCache, NoCache};
use record::Record;
use thiserror::Error;
pub use tonbo_macros::{KeyAttributes, Record};
// Legacy derive removed in favor of typed-arrow integration.
use tracing::error;
use transaction::{CommitError, Transaction, TransactionEntry};
use trigger::FreezeTrigger;
Expand Down
1 change: 1 addition & 0 deletions src/magic.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pub const TS: &str = "_ts";
pub const NULL: &str = "_null";
pub(crate) const USER_COLUMN_OFFSET: usize = 2;
4 changes: 2 additions & 2 deletions src/record/dynamic/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl DynSchema {
metadata.insert("primary_key_index".to_string(), primary_index.to_string());
let arrow_schema = Arc::new(ArrowSchema::new_with_metadata(
[
Field::new("_null", DataType::Boolean, false),
Field::new(magic::NULL, DataType::Boolean, false),
Field::new(magic::TS, DataType::UInt32, false),
]
.into_iter()
Expand Down Expand Up @@ -106,7 +106,7 @@ impl DynSchema {
let arrow_schema = ArrowSchema::try_merge(vec![
ArrowSchema::new_with_metadata(
vec![
Field::new("_null", DataType::Boolean, false),
Field::new(magic::NULL, DataType::Boolean, false),
Field::new(magic::TS, DataType::UInt32, false),
],
metadata,
Expand Down
1 change: 1 addition & 0 deletions src/record/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub mod key;
pub mod option;
#[cfg(test)]
pub(crate) mod test;
pub mod typed;

use std::{fmt::Debug, sync::Arc};

Expand Down
2 changes: 1 addition & 1 deletion src/record/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ impl Schema for StringSchema {
fn arrow_schema(&self) -> &Arc<ArrowSchema> {
static SCHEMA: Lazy<Arc<ArrowSchema>> = Lazy::new(|| {
Arc::new(ArrowSchema::new(vec![
Field::new("_null", DataType::Boolean, false),
Field::new(magic::NULL, DataType::Boolean, false),
Field::new(magic::TS, DataType::UInt32, false),
Field::new(PRIMARY_FIELD_NAME, DataType::Utf8, false),
]))
Expand Down
Loading
Loading