Skip to content

Commit 4bfc67e

Browse files
committed
feat: begin implementation of cdl-ip-linux-io-uring
1 parent 1d1da23 commit 4bfc67e

File tree

28 files changed

+1163
-1
lines changed

28 files changed

+1163
-1
lines changed

Cargo.toml

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[workspace]
22
default-members = ["crates/cdl"]
3-
members = ["benchmark", "crates/*", "python"]
3+
members = ["benchmark", "crates/*", "contrib/*", "python"]
44
exclude = ["crates/target"]
55
resolver = "2"
66

@@ -35,6 +35,13 @@ cdl-openapi = { version = "0.1.4", path = "crates/cdl-openapi", default-features
3535
cdl-python = { version = "0.1.4", path = "python", default-features = false }
3636
cdl-store = { version = "0.1.4", path = "crates/cdl-store", default-features = false }
3737

38+
cdl-ip-broker = { version = "0.1.4", path = "contrib/cdl-ip-broker", default-features = false }
39+
cdl-ip-core = { version = "0.1.4", path = "contrib/cdl-ip-core", default-features = false }
40+
cdl-ip-eth-udp = { version = "0.1.4", path = "contrib/cdl-ip-eth-udp", default-features = false }
41+
cdl-ip-eth-tcp = { version = "0.1.4", path = "contrib/cdl-ip-eth-tcp", default-features = false }
42+
cdl-ip-linux-io-uring = { version = "0.1.4", path = "contrib/cdl-ip-linux-io-uring", default-features = false }
43+
cdl-ip-mem = { version = "0.1.4", path = "contrib/cdl-ip-mem", default-features = false }
44+
3845
anyhow = { version = "1.0", features = ["backtrace"] }
3946
argon2 = { version = "0.5" }
4047
arrow = { version = "52", default-features = false, features = [ # depends: lance
@@ -44,6 +51,7 @@ arrow = { version = "52", default-features = false, features = [ # depends: lanc
4451

4552
] }
4653
async-trait = { version = "0.1" }
54+
bitflags = { version = "2.6" }
4755
byte-unit = { version = "5.1" }
4856
bytes = { version = "1.8" }
4957
chrono = { version = "0.4" }
@@ -68,6 +76,7 @@ inflector = { package = "Inflector", version = "0.11" }
6876
itertools = { version = "0.13" }
6977
k8s-openapi = { version = "0.23", features = ["schemars", "v1_30"] }
7078
kube = { version = "0.96", default-features = false }
79+
io-uring = { version = "0.7", default-features = false }
7180
lance = { version = "0.20", default-features = false }
7281
lance-core = { version = "0.20", default-features = false } # depends: lance
7382
lance-encoding = { version = "0.20", default-features = false } # depends: lance
@@ -76,6 +85,7 @@ lance-table = { version = "0.20", default-features = false } # depends: lance
7685
libc = { version = "0.2" }
7786
maplit = { version = "1.0" }
7887
minio = { version = "0.2.0-alpha", default-features = false }
88+
nix = { version = "0.29", default-features = false }
7989
object_store = { version = "0.10" } # depends: lance
8090
opentelemetry = { version = "0.27" }
8191
opentelemetry-appender-tracing = { version = "0.27", features = [
@@ -130,3 +140,6 @@ lance-io = { git = "https://github.com/ulagbulag/lance.git", rev = "24f38842d473
130140
lance-table = { git = "https://github.com/ulagbulag/lance.git", rev = "24f38842d47333925184f9501cd337fe4e2266b5" }
131141
minio = { git = "https://github.com/ulagbulag/minio-rs.git", rev = "420ac0210f491b9df8ee54995e79386f91cb96a1" }
132142
sio = { git = "https://github.com/ulagbulag/sio-rs.git", rev = "45f33abe0d0ce624ce56a95cbe1d5ff55c4fdb67" }
143+
144+
# [profile.release]
145+
# lto = true

contrib/cdl-ip-broker/Cargo.toml

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
[package]
2+
name = "cdl-ip-broker"
3+
4+
authors = { workspace = true }
5+
description = { workspace = true }
6+
documentation = "https://docs.rs/cdl-ip-broker"
7+
edition = { workspace = true }
8+
include = { workspace = true }
9+
keywords = { workspace = true }
10+
license = { workspace = true }
11+
readme = { workspace = true }
12+
rust-version = { workspace = true }
13+
homepage = { workspace = true }
14+
repository = { workspace = true }
15+
version = { workspace = true }
16+
17+
[lints]
18+
workspace = true
19+
20+
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
21+
22+
[dependencies]
23+
cdl-ip-core = { workspace = true }

contrib/cdl-ip-broker/src/lib.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
pub fn add(left: u64, right: u64) -> u64 {
2+
left + right
3+
}
4+
5+
#[cfg(test)]
6+
mod tests {
7+
use super::*;
8+
9+
#[test]
10+
fn it_works() {
11+
let result = add(2, 2);
12+
assert_eq!(result, 4);
13+
}
14+
}

contrib/cdl-ip-core/Cargo.toml

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
[package]
2+
name = "cdl-ip-core"
3+
4+
authors = { workspace = true }
5+
description = { workspace = true }
6+
documentation = "https://docs.rs/cdl-ip-core"
7+
edition = { workspace = true }
8+
include = { workspace = true }
9+
keywords = { workspace = true }
10+
license = { workspace = true }
11+
readme = { workspace = true }
12+
rust-version = { workspace = true }
13+
homepage = { workspace = true }
14+
repository = { workspace = true }
15+
version = { workspace = true }
16+
17+
[lints]
18+
workspace = true
19+
20+
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
21+
22+
[features]
23+
default = []
24+
serde = ["dep:serde"]
25+
26+
# scopes
27+
local = []
28+
global = []
29+
30+
[dependencies]
31+
async-trait = { workspace = true }
32+
bitflags = { workspace = true }
33+
serde = { workspace = true, optional = true, features = ["derive"] }

contrib/cdl-ip-core/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
pub mod node;
2+
pub mod task;

contrib/cdl-ip-core/src/node.rs

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
#[cfg(feature = "local")]
2+
use std::os::fd::{AsFd, AsRawFd, BorrowedFd, IntoRawFd, OwnedFd, RawFd};
3+
4+
use bitflags::bitflags;
5+
6+
/// A generic node.
7+
///
8+
pub enum Node {
9+
/// Local node
10+
#[cfg(feature = "local")]
11+
Local(LocalNode),
12+
}
13+
14+
/// A local node.
15+
///
16+
#[derive(Clone)]
17+
#[cfg(feature = "local")]
18+
pub struct LocalNode<Fd = OwnedFd> {
19+
/// A target file descriptor.
20+
pub fd: Fd,
21+
/// An offset to be accessed.
22+
/// Negative values (e.g. -1) annotates that the offset is not given.
23+
pub offset: i64,
24+
/// A fixed length.
25+
/// Negative values (e.g. -1) annotates that the length is not given.
26+
pub len: i64,
27+
/// Target file descriptor's flags.
28+
pub flags: NodeFlags,
29+
}
30+
31+
#[cfg(feature = "local")]
32+
impl<Fd> AsFd for LocalNode<Fd>
33+
where
34+
Fd: AsFd,
35+
{
36+
#[inline]
37+
fn as_fd(&self) -> BorrowedFd<'_> {
38+
self.fd.as_fd()
39+
}
40+
}
41+
42+
#[cfg(feature = "local")]
43+
impl<Fd> AsRawFd for LocalNode<Fd>
44+
where
45+
Fd: AsRawFd,
46+
{
47+
#[inline]
48+
fn as_raw_fd(&self) -> RawFd {
49+
self.fd.as_raw_fd()
50+
}
51+
}
52+
53+
#[cfg(feature = "local")]
54+
impl<Fd> IntoRawFd for LocalNode<Fd>
55+
where
56+
Fd: IntoRawFd,
57+
{
58+
#[inline]
59+
fn into_raw_fd(self) -> RawFd {
60+
self.fd.into_raw_fd()
61+
}
62+
}
63+
64+
#[cfg(feature = "local")]
65+
impl<Fd> LocalNode<Fd> {
66+
#[inline]
67+
pub fn as_raw(&self) -> LocalNode<RawFd>
68+
where
69+
Fd: AsRawFd,
70+
{
71+
let Self {
72+
fd,
73+
offset,
74+
len,
75+
flags,
76+
} = self;
77+
LocalNode {
78+
fd: fd.as_raw_fd(),
79+
offset: *offset,
80+
len: *len,
81+
flags: *flags,
82+
}
83+
}
84+
85+
/// Return true when the node can be read.
86+
pub const fn is_readable(&self) -> bool {
87+
self.flags.contains(NodeFlags::MODE_READ)
88+
}
89+
90+
/// Return true when the node can be written.
91+
pub const fn is_writeable(&self) -> bool {
92+
self.flags.contains(NodeFlags::MODE_WRITE)
93+
}
94+
95+
/// Return true when the node is a kind of pipes.
96+
pub const fn is_piped(&self) -> bool {
97+
self.flags.contains(NodeFlags::MODE_WRITE)
98+
}
99+
100+
/// Return true when the node's size can be truncated.
101+
pub const fn is_feature_truncate(&self) -> bool {
102+
self.flags.contains(NodeFlags::FEAT_TRUNCATE)
103+
}
104+
}
105+
106+
bitflags! {
107+
/// A node's flags that contains modes and features.
108+
///
109+
#[derive(Copy, Clone,Debug, PartialEq, Eq)]
110+
pub struct NodeFlags: u8 {
111+
/// Ensures the node can be read.
112+
const MODE_READ = 1 << 0;
113+
/// Ensures the node can be written.
114+
const MODE_WRITE = 1 << 1;
115+
116+
/// Ensures the node is a kind of pipes.
117+
const FEAT_PIPE = 1 << 2;
118+
/// Ensures the node's size metedata can be accessed on runtime.
119+
const FEAT_SIZED = 1 << 3;
120+
/// Ensures the node's size can be truncated.
121+
const FEAT_TRUNCATE = 1 << 4;
122+
}
123+
}

contrib/cdl-ip-core/src/task.rs

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
use std::{io, thread::spawn};
2+
3+
use crate::node::LocalNode;
4+
5+
/// A generic task builder.
6+
///
7+
pub trait TaskBuilder {
8+
/// An error type.
9+
type Error;
10+
11+
/// A target task.
12+
type Task: Task<Error = Self::Error>;
13+
14+
/// Create a new task starting with the given node.
15+
#[must_use]
16+
fn open(&self, sink: LocalNode) -> Self::Task;
17+
}
18+
19+
/// A generic task trait.
20+
///
21+
pub trait Task {
22+
/// An error type.
23+
type Error;
24+
25+
/// A target node.
26+
type Node;
27+
28+
/// Register a typed node into the task.
29+
#[must_use]
30+
fn map(&mut self, sink: Self::Node, src: Self::Node) -> &mut Self;
31+
32+
/// Complete building a task.
33+
/// Note that this task cannot be reused.
34+
#[must_use]
35+
fn finish(
36+
&mut self,
37+
src: Self::Node,
38+
) -> Result<TaskContext<Self::Node, Self::Error>, Self::Error>;
39+
}
40+
41+
/// An additional task trait that derives automatically.
42+
///
43+
pub trait TaskExt: Task {
44+
/// Register a node into the task.
45+
#[inline]
46+
fn push<Tx, Rx>(&mut self, sink: Tx, src: Rx) -> Result<(), Self::Error>
47+
where
48+
Tx: TryInto<Self::Node>,
49+
Rx: TryInto<Self::Node>,
50+
Self::Error: From<Tx::Error> + From<Rx::Error>,
51+
{
52+
let _ = self.map(sink.try_into()?, src.try_into()?);
53+
Ok(())
54+
}
55+
56+
/// Spawn the task into a new thread and return the last node.
57+
#[inline]
58+
fn to_node(&mut self, src: Self::Node) -> Result<Self::Node, Self::Error>
59+
where
60+
Self::Error: 'static + Send + From<io::Error>,
61+
{
62+
self.finish(src)?.into_node()
63+
}
64+
65+
/// Spawn the task into the current thread and wait until finished.
66+
#[inline]
67+
fn wait(&mut self, src: Self::Node) -> Result<(), Self::Error>
68+
where
69+
Self::Error: 'static + Send,
70+
{
71+
self.finish(src)?.wait()
72+
}
73+
}
74+
75+
impl<T> TaskExt for T where Self: Task {}
76+
77+
/// A lazy task that contains actual task information.
78+
///
79+
pub struct TaskContext<N, E> {
80+
bound: TaskBound<E>,
81+
node: N,
82+
}
83+
84+
impl<N, E> TaskContext<N, E>
85+
where
86+
E: 'static + Send,
87+
{
88+
/// Create a new task context.
89+
pub fn new(bound: TaskBound<E>, node: N) -> Self {
90+
Self { bound, node }
91+
}
92+
93+
/// Spawn the task into a new thread and return the last node.
94+
pub fn into_node(self) -> Result<N, E>
95+
where
96+
E: From<io::Error>,
97+
{
98+
let Self { bound, node } = self;
99+
match bound {
100+
#[cfg(feature = "local")]
101+
TaskBound::LocalThread(_) => {
102+
return Err(io::Error::new(
103+
io::ErrorKind::InvalidInput,
104+
"the task is not thread-safe",
105+
)
106+
.into());
107+
}
108+
#[cfg(feature = "local")]
109+
TaskBound::LocalProcess(f) => {
110+
spawn(f);
111+
Ok(node)
112+
}
113+
}
114+
}
115+
116+
/// Spawn the task into the current thread and wait until finished.
117+
#[cfg(feature = "local")]
118+
pub fn wait(self) -> Result<(), E> {
119+
let Self { bound, node } = self;
120+
let result = match bound {
121+
#[cfg(feature = "local")]
122+
TaskBound::LocalThread(f) => f(),
123+
#[cfg(feature = "local")]
124+
TaskBound::LocalProcess(f) => f(),
125+
};
126+
drop(node);
127+
result
128+
}
129+
}
130+
131+
/// A generic task context.
132+
///
133+
pub enum TaskBound<E> {
134+
/// A thread-unsafe local task.
135+
#[cfg(feature = "local")]
136+
LocalThread(Box<dyn FnOnce() -> Result<(), E>>),
137+
/// A thread-safe local task.
138+
#[cfg(feature = "local")]
139+
LocalProcess(Box<dyn Send + FnOnce() -> Result<(), E>>),
140+
}

0 commit comments

Comments
 (0)