Skip to content

Commit c8854dd

Browse files
committed
feat(benchmark): add basic pond creation benchmark support
1 parent 9b59f7c commit c8854dd

File tree

13 files changed

+444
-18
lines changed

13 files changed

+444
-18
lines changed

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[workspace]
22
default-members = ["crates/cdl"]
3-
members = ["crates/*", "python"]
3+
members = ["benchmark", "crates/*", "python"]
44
exclude = ["crates/target"]
55
resolver = "2"
66

@@ -23,6 +23,7 @@ blocks_in_conditions = "allow" # opentelemetry tracing macro would be noisy
2323

2424
[workspace.dependencies]
2525
cdl = { version = "0.1.4", path = "crates/cdl", default-features = false }
26+
cdl-benchmark = { version = "0.1.4", path = "benchmark", default-features = false }
2627
cdl-catalog = { version = "0.1.4", path = "crates/cdl-catalog", default-features = false }
2728
cdl-core = { version = "0.1.4", path = "crates/cdl-core", default-features = false }
2829
cdl-fs = { version = "0.1.4", path = "crates/cdl-fs", default-features = false }

benchmark/Cargo.toml

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
[package]
2+
name = "cdl-benchmark"
3+
4+
authors = { workspace = true }
5+
description = { workspace = true }
6+
documentation = "https://docs.rs/cdl-benchmark"
7+
edition = { workspace = true }
8+
include = { workspace = true }
9+
keywords = { workspace = true }
10+
license = { workspace = true }
11+
readme = { workspace = true }
12+
rust-version = { workspace = true }
13+
homepage = { workspace = true }
14+
repository = { workspace = true }
15+
version = { workspace = true }
16+
17+
[lints]
18+
workspace = true
19+
20+
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
21+
22+
[features]
23+
default = ["default-tls"]
24+
25+
# TLS
26+
default-tls = ["rustls-tls"]
27+
openssl-tls = ["kube/openssl-tls", "prometheus-http-query/native-tls"]
28+
rustls-tls = ["kube/rustls-tls", "prometheus-http-query/rustls-tls"]
29+
30+
[dependencies]
31+
cdl-k8s-core = { workspace = true, features = ["opentelemetry-all"] }
32+
cdl-openapi = { workspace = true, features = ["k8s"] }
33+
34+
anyhow = { workspace = true }
35+
async-trait = { workspace = true }
36+
clap = { workspace = true }
37+
futures = { workspace = true }
38+
k8s-openapi = { workspace = true }
39+
kube = { workspace = true, features = ["client", "runtime", "ws"] }
40+
maplit = { workspace = true }
41+
prometheus-http-query = { workspace = true }
42+
tokio = { workspace = true, features = ["full"] }
43+
tracing = { workspace = true }
44+
45+
[target.'cfg(target_os = "linux")'.dependencies]
46+
cdl-fuse = { workspace = true }

benchmark/src/args.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
use anyhow::Result;
2+
use clap::Parser;
3+
4+
use crate::ins::InstructionStack;
5+
6+
#[derive(Clone, Debug, PartialEq, Parser)]
7+
pub struct Args {
8+
#[command(subcommand)]
9+
pub command: crate::command::Command,
10+
11+
#[command(flatten)]
12+
pub common: CommonArgs,
13+
}
14+
15+
impl Args {
16+
pub(super) async fn execute(self) -> Result<()> {
17+
let mut stack = InstructionStack::try_default().await?;
18+
19+
let mut error = None;
20+
for ins in self.command.to_instructions(self.common).await? {
21+
match stack.push(ins).await {
22+
Ok(()) => continue,
23+
Err(e) => {
24+
error = Some(e);
25+
break;
26+
}
27+
}
28+
}
29+
30+
stack.cleanup().await?;
31+
match error {
32+
None => Ok(()),
33+
Some(e) => Err(e),
34+
}
35+
}
36+
}
37+
38+
#[derive(Clone, Debug, PartialEq, Parser)]
39+
pub struct CommonArgs {
40+
#[arg(long)]
41+
pub connected: bool,
42+
}

benchmark/src/command/create.rs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
use anyhow::Result;
2+
use clap::{Parser, ValueEnum};
3+
use tracing::instrument;
4+
5+
use crate::{
6+
args::CommonArgs,
7+
ins::{self, Instruction},
8+
};
9+
10+
#[derive(Copy, Clone, Debug, PartialEq, ValueEnum)]
11+
pub enum CreateType {
12+
Dataset,
13+
Pond,
14+
}
15+
16+
#[derive(Clone, Debug, PartialEq, Parser)]
17+
pub struct CreateArgs {
18+
pub ty: CreateType,
19+
20+
#[arg(long, default_value_t = 1)]
21+
pub num_k: usize,
22+
}
23+
24+
impl CreateArgs {
25+
#[instrument(skip_all)]
26+
pub(super) async fn to_instructions(
27+
self,
28+
common: CommonArgs,
29+
) -> Result<Vec<Box<dyn Instruction>>> {
30+
let Self { ty, num_k } = self;
31+
32+
match ty {
33+
CreateType::Dataset => Ok(vec![
34+
Box::new(ins::create_ponds::Instruction { num_k: 1 }),
35+
Box::new(ins::create_datasets::Instruction { num_k }),
36+
]),
37+
CreateType::Pond => Ok(vec![Box::new(ins::create_ponds::Instruction { num_k })]),
38+
}
39+
}
40+
}

benchmark/src/command/mod.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
pub mod create;
2+
3+
use anyhow::Result;
4+
use clap::Subcommand;
5+
6+
use crate::{args::CommonArgs, ins::Instruction};
7+
8+
#[derive(Clone, Debug, PartialEq, Subcommand)]
9+
pub enum Command {
10+
Create(self::create::CreateArgs),
11+
}
12+
13+
impl Command {
14+
pub(super) async fn to_instructions(
15+
self,
16+
common: CommonArgs,
17+
) -> Result<Vec<Box<dyn Instruction>>> {
18+
match self {
19+
Self::Create(args) => args.to_instructions(common).await,
20+
}
21+
}
22+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
use anyhow::Result;
2+
use async_trait::async_trait;
3+
use kube::Client;
4+
use tracing::{instrument, Level};
5+
6+
#[derive(Debug)]
7+
pub struct Instruction {
8+
pub num_k: usize,
9+
}
10+
11+
#[async_trait]
12+
impl super::Instruction for Instruction {
13+
#[instrument(skip_all, err(level = Level::ERROR))]
14+
async fn apply(&self, kube: &Client) -> Result<()> {
15+
Ok(())
16+
}
17+
18+
#[instrument(skip_all, err(level = Level::ERROR))]
19+
async fn delete(&self, kube: &Client) -> Result<()> {
20+
Ok(())
21+
}
22+
}

benchmark/src/ins/create_ponds.rs

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
use std::time::Duration;
2+
3+
use anyhow::{Error, Result};
4+
use async_trait::async_trait;
5+
use cdl_openapi::model_storage::{
6+
object::{
7+
ModelStorageObjectOwnedReplicationSpec, ModelStorageObjectOwnedSpec, ModelStorageObjectSpec,
8+
},
9+
ModelStorageCrd, ModelStorageKindSpec, ModelStorageSpec, ModelStorageState,
10+
};
11+
use futures::{stream::FuturesUnordered, TryStreamExt};
12+
use k8s_openapi::{
13+
api::core::v1::{Namespace, ResourceRequirements},
14+
apimachinery::pkg::api::resource::Quantity,
15+
};
16+
use kube::{
17+
api::{DeleteParams, ObjectMeta, PostParams, PropagationPolicy},
18+
Api, Client, ResourceExt,
19+
};
20+
use maplit::btreemap;
21+
use tokio::time::sleep;
22+
use tracing::{instrument, Level};
23+
24+
#[derive(Debug)]
25+
pub struct Instruction {
26+
pub num_k: usize,
27+
}
28+
29+
#[async_trait]
30+
impl super::Instruction for Instruction {
31+
#[instrument(skip_all, err(level = Level::ERROR))]
32+
async fn apply(&self, kube: &Client) -> Result<()> {
33+
let objects: Vec<_> = (0..self.num_k)
34+
.map(|k| format!("cdl-benchmark-{k:07}"))
35+
.map(|namespace| ModelStorageCrd {
36+
metadata: ObjectMeta {
37+
name: Some("object-storage".into()),
38+
namespace: Some(namespace),
39+
labels: Some(btreemap! {
40+
"cdl.ulagbulag.io/benchmark".into() => "true".into(),
41+
}),
42+
..Default::default()
43+
},
44+
spec: ModelStorageSpec {
45+
kind: ModelStorageKindSpec::ObjectStorage(ModelStorageObjectSpec::Owned(
46+
ModelStorageObjectOwnedSpec {
47+
replication: ModelStorageObjectOwnedReplicationSpec {
48+
resources: ResourceRequirements {
49+
requests: Some(btreemap! {
50+
"storage".into() => Quantity("10Ti".into()),
51+
}),
52+
..Default::default()
53+
},
54+
..Default::default()
55+
},
56+
..Default::default()
57+
},
58+
)),
59+
default: true,
60+
},
61+
status: None,
62+
})
63+
.collect();
64+
65+
let api_ns = Api::all(kube.clone());
66+
let pp = PostParams::default();
67+
objects
68+
.iter()
69+
.map(|object| async {
70+
let ns = Namespace {
71+
metadata: ObjectMeta {
72+
name: object.namespace(),
73+
labels: Some(btreemap! {
74+
"cdl.ulagbulag.io/benchmark".into() => "true".into(),
75+
}),
76+
..Default::default()
77+
},
78+
spec: None,
79+
status: None,
80+
};
81+
api_ns.create(&pp, &ns).await?;
82+
83+
let api = Api::namespaced(kube.clone(), &ns.name_any());
84+
api.create(&pp, object).await?;
85+
86+
let name = object.name_any();
87+
loop {
88+
let object = api.get(&name).await?;
89+
if object
90+
.status
91+
.as_ref()
92+
.map(|status| status.state == ModelStorageState::Ready)
93+
.unwrap_or_default()
94+
{
95+
break;
96+
}
97+
sleep(Duration::from_secs(10)).await;
98+
}
99+
Ok::<_, Error>(())
100+
})
101+
.collect::<FuturesUnordered<_>>()
102+
.try_collect()
103+
.await
104+
}
105+
106+
#[instrument(skip_all, err(level = Level::ERROR))]
107+
async fn delete(&self, kube: &Client) -> Result<()> {
108+
let namespaces: Vec<_> = (0..self.num_k)
109+
.map(|k| format!("cdl-benchmark-{k:07}"))
110+
.collect();
111+
112+
let api = Api::<Namespace>::all(kube.clone());
113+
let dp = DeleteParams {
114+
propagation_policy: Some(PropagationPolicy::Foreground),
115+
..Default::default()
116+
};
117+
namespaces
118+
.iter()
119+
.map(|namespace| async {
120+
api.delete(namespace, &dp).await?;
121+
Ok::<_, Error>(())
122+
})
123+
.collect::<FuturesUnordered<_>>()
124+
.try_collect()
125+
.await
126+
}
127+
}

benchmark/src/ins/mod.rs

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
pub mod create_datasets;
2+
pub mod create_ponds;
3+
4+
use anyhow::Result;
5+
use async_trait::async_trait;
6+
use kube::Client;
7+
8+
#[async_trait]
9+
pub trait Instruction
10+
where
11+
Self: Send,
12+
{
13+
async fn apply(&self, kube: &Client) -> Result<()>;
14+
15+
async fn delete(&self, kube: &Client) -> Result<()>;
16+
}
17+
18+
pub struct InstructionStack {
19+
inner: Vec<Box<dyn Instruction>>,
20+
kube: Client,
21+
}
22+
23+
impl InstructionStack {
24+
pub async fn try_default() -> Result<Self> {
25+
Ok(Self {
26+
inner: Vec::default(),
27+
kube: Client::try_default().await?,
28+
})
29+
}
30+
31+
pub async fn push(&mut self, ins: Box<dyn Instruction>) -> Result<()> {
32+
self.inner.push(ins);
33+
self.inner.last().unwrap().apply(&self.kube).await?;
34+
Ok(())
35+
}
36+
37+
pub async fn cleanup(self) -> Result<()> {
38+
for ins in self.inner.into_iter().rev() {
39+
ins.delete(&self.kube).await?;
40+
}
41+
Ok(())
42+
}
43+
}

benchmark/src/main.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
mod args;
2+
mod command;
3+
mod ins;
4+
5+
use anyhow::Result;
6+
use clap::Parser;
7+
use tracing::{debug, error, info};
8+
9+
#[::tokio::main]
10+
async fn main() {
11+
let args = self::args::Args::parse();
12+
13+
::cdl_k8s_core::otel::init_once();
14+
info!("Welcome to Connected Data Lake Benchmark!");
15+
16+
match try_main(args).await {
17+
Ok(()) => info!("Done"),
18+
Err(error) => error!("{error}"),
19+
}
20+
}
21+
22+
async fn try_main(args: self::args::Args) -> Result<()> {
23+
debug!("Starting Connected Data Lake CLI Benchmark");
24+
args.execute().await
25+
}

0 commit comments

Comments
 (0)