Skip to content

Commit c4b9a85

Browse files
committed
fix(benchmark): make creation more parallel
1 parent c359440 commit c4b9a85

File tree

5 files changed

+85
-42
lines changed

5 files changed

+85
-42
lines changed

benchmark/src/args.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,11 +55,11 @@ impl Args {
5555
#[derive(Clone, Debug, PartialEq, Parser)]
5656
pub struct CommonArgs {
5757
#[arg(long, default_value_t = 1000)]
58-
pub check_interval_ms: u64,
58+
pub apply_interval_ms: u64,
5959

6060
#[arg(long)]
6161
pub connected: bool,
6262

63-
#[arg(long, default_value_t = 16)]
63+
#[arg(long, default_value_t = 20)]
6464
pub num_threads: usize,
6565
}

benchmark/src/ins/create_datasets.rs

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,13 @@ use kube::{
2020
};
2121
use maplit::btreemap;
2222
use tokio::time::sleep;
23-
use tracing::{instrument, Level};
23+
use tracing::{info, instrument, Level};
2424

2525
use crate::args::CommonArgs;
2626

2727
use super::Metrics;
2828

29-
#[derive(Debug)]
29+
#[derive(Copy, Clone, Debug)]
3030
pub struct Instruction {
3131
pub num_k: usize,
3232
}
@@ -35,6 +35,9 @@ pub struct Instruction {
3535
impl super::Instruction for Instruction {
3636
#[instrument(skip_all, err(level = Level::ERROR))]
3737
async fn apply(&self, kube: &Client, args: &CommonArgs, _metrics: &mut Metrics) -> Result<()> {
38+
let Self { num_k } = *self;
39+
info!("create_datasets: create {num_k}");
40+
3841
let namespaces = {
3942
let api = Api::<Namespace>::all(kube.clone());
4043
let lp = ListParams {
@@ -49,7 +52,7 @@ impl super::Instruction for Instruction {
4952
.cycle()
5053
};
5154

52-
let objects: Vec<_> = (0..self.num_k)
55+
let objects: Vec<_> = (0..num_k)
5356
.map(|k| format!("cdl-benchmark-dataset-{k:07}"))
5457
.zip(namespaces)
5558
.map(|(name, namespace)| ModelClaimCrd {
@@ -75,14 +78,21 @@ impl super::Instruction for Instruction {
7578
.collect();
7679

7780
let pp = PostParams::default();
81+
for object in &objects {
82+
let api = Api::namespaced(kube.clone(), &object.namespace().unwrap());
83+
api.create(&pp, object).await?;
84+
sleep(Duration::from_millis(
85+
args.apply_interval_ms / args.num_threads as u64,
86+
))
87+
.await;
88+
}
89+
7890
stream::iter(objects.iter().map(|x| Ok(x)))
7991
.try_for_each_concurrent(args.num_threads, |object| async {
8092
let api = Api::namespaced(kube.clone(), &object.namespace().unwrap());
81-
api.create(&pp, object).await?;
82-
8393
let name = object.name_any();
8494
loop {
85-
let object = api.get(&name).await?;
95+
let object: ModelClaimCrd = api.get(&name).await?;
8696
if object
8797
.status
8898
.as_ref()
@@ -91,7 +101,7 @@ impl super::Instruction for Instruction {
91101
{
92102
break;
93103
}
94-
sleep(Duration::from_millis(args.check_interval_ms)).await;
104+
sleep(Duration::from_millis(args.apply_interval_ms)).await;
95105
}
96106
Ok::<_, Error>(())
97107
})
@@ -100,6 +110,9 @@ impl super::Instruction for Instruction {
100110

101111
#[instrument(skip_all, err(level = Level::ERROR))]
102112
async fn delete(&self, kube: &Client, args: &CommonArgs, _metrics: &mut Metrics) -> Result<()> {
113+
let Self { num_k } = *self;
114+
info!("create_datasets: delete {num_k}");
115+
103116
let items: Vec<_> = {
104117
let api = Api::<ModelClaimCrd>::all(kube.clone());
105118
let lp = ListParams {
@@ -129,7 +142,7 @@ impl super::Instruction for Instruction {
129142
.try_for_each_concurrent(args.num_threads, |(namespace, name)| async {
130143
let api = Api::<ModelClaimCrd>::namespaced(kube.clone(), namespace);
131144
api.delete(name, &dp).await?;
132-
sleep(Duration::from_millis(args.check_interval_ms)).await;
145+
sleep(Duration::from_millis(args.apply_interval_ms)).await;
133146
Ok::<_, Error>(())
134147
})
135148
.await

benchmark/src/ins/create_ponds.rs

Lines changed: 57 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,13 @@ use kube::{
2222
};
2323
use maplit::btreemap;
2424
use tokio::time::sleep;
25-
use tracing::{instrument, Level};
25+
use tracing::{info, instrument, Level};
2626

2727
use crate::args::CommonArgs;
2828

2929
use super::Metrics;
3030

31-
#[derive(Debug)]
31+
#[derive(Copy, Clone, Debug)]
3232
pub struct Instruction {
3333
pub num_k: usize,
3434
}
@@ -37,7 +37,10 @@ pub struct Instruction {
3737
impl super::Instruction for Instruction {
3838
#[instrument(skip_all, err(level = Level::ERROR))]
3939
async fn apply(&self, kube: &Client, args: &CommonArgs, _metrics: &mut Metrics) -> Result<()> {
40-
let objects: Vec<_> = (0..self.num_k)
40+
let Self { num_k } = *self;
41+
info!("create_ponds: create {num_k}");
42+
43+
let objects: Vec<_> = (0..num_k)
4144
.map(|k| format!("cdl-benchmark-{k:07}"))
4245
.map(|namespace| ModelStorageCrd {
4346
metadata: ObjectMeta {
@@ -53,12 +56,17 @@ impl super::Instruction for Instruction {
5356
ModelStorageObjectOwnedSpec {
5457
replication: ModelStorageObjectOwnedReplicationSpec {
5558
resources: ResourceRequirements {
59+
limits: Some(btreemap! {
60+
"cpu".into() => Quantity("1".into()),
61+
"memory".into() => Quantity("1Gi".into()),
62+
}),
5663
requests: Some(btreemap! {
5764
"storage".into() => Quantity("10Ti".into()),
5865
}),
5966
..Default::default()
6067
},
61-
..Default::default()
68+
total_nodes: 1,
69+
total_volumes_per_node: 1,
6270
},
6371
..Default::default()
6472
},
@@ -71,35 +79,42 @@ impl super::Instruction for Instruction {
7179

7280
let api_ns = Api::all(kube.clone());
7381
let pp = PostParams::default();
74-
stream::iter(objects.iter().map(|x| Ok(x)))
75-
.try_for_each_concurrent(args.num_threads, |object| async {
76-
let ns = Namespace {
77-
metadata: ObjectMeta {
78-
name: object.namespace(),
79-
labels: Some(btreemap! {
80-
"cdl.ulagbulag.io/benchmark".into() => "true".into(),
81-
}),
82-
..Default::default()
83-
},
84-
spec: None,
85-
status: None,
86-
};
87-
api_ns.create(&pp, &ns).await?;
82+
for object in &objects {
83+
let ns = Namespace {
84+
metadata: ObjectMeta {
85+
name: object.namespace(),
86+
labels: Some(btreemap! {
87+
"cdl.ulagbulag.io/benchmark".into() => "true".into(),
88+
}),
89+
..Default::default()
90+
},
91+
spec: None,
92+
status: None,
93+
};
94+
api_ns.create(&pp, &ns).await?;
8895

89-
let namespace = ns.name_any();
90-
loop {
91-
if api_ns.get_metadata_opt(&namespace).await?.is_some() {
92-
break;
93-
}
94-
sleep(Duration::from_millis(args.check_interval_ms)).await;
96+
let namespace = ns.name_any();
97+
loop {
98+
if api_ns.get_metadata_opt(&namespace).await?.is_some() {
99+
break;
95100
}
101+
sleep(Duration::from_millis(args.apply_interval_ms)).await;
102+
}
96103

97-
let api = Api::namespaced(kube.clone(), &namespace);
98-
api.create(&pp, object).await?;
104+
let api = Api::namespaced(kube.clone(), &namespace);
105+
api.create(&pp, object).await?;
106+
sleep(Duration::from_millis(
107+
args.apply_interval_ms / args.num_threads as u64,
108+
))
109+
.await;
110+
}
99111

112+
stream::iter(objects.iter().map(|x| Ok(x)))
113+
.try_for_each_concurrent(args.num_threads, |object| async {
114+
let api = Api::namespaced(kube.clone(), &object.namespace().unwrap());
100115
let name = object.name_any();
101116
loop {
102-
let object = api.get(&name).await?;
117+
let object: ModelStorageCrd = api.get(&name).await?;
103118
if object
104119
.status
105120
.as_ref()
@@ -108,7 +123,7 @@ impl super::Instruction for Instruction {
108123
{
109124
break;
110125
}
111-
sleep(Duration::from_millis(args.check_interval_ms)).await;
126+
sleep(Duration::from_millis(args.apply_interval_ms)).await;
112127
}
113128
Ok::<_, Error>(())
114129
})
@@ -117,7 +132,10 @@ impl super::Instruction for Instruction {
117132

118133
#[instrument(skip_all, err(level = Level::ERROR))]
119134
async fn delete(&self, kube: &Client, args: &CommonArgs, _metrics: &mut Metrics) -> Result<()> {
120-
let namespaces: Vec<_> = (0..self.num_k)
135+
let Self { num_k } = *self;
136+
info!("create_ponds: delete {num_k}");
137+
138+
let namespaces: Vec<_> = (0..num_k)
121139
.map(|k| format!("cdl-benchmark-{k:07}"))
122140
.collect();
123141

@@ -133,13 +151,22 @@ impl super::Instruction for Instruction {
133151
.collect::<FuturesUnordered<_>>()
134152
.try_for_each_concurrent(args.num_threads, |namespace| async {
135153
api.delete(namespace, &dp).await?;
154+
sleep(Duration::from_millis(args.apply_interval_ms)).await;
155+
Ok::<_, Error>(())
156+
})
157+
.await?;
136158

159+
namespaces
160+
.iter()
161+
.map(|x| async move { Ok(x) })
162+
.collect::<FuturesUnordered<_>>()
163+
.try_for_each_concurrent(args.num_threads, |namespace| async {
137164
loop {
138165
let object = api.get_metadata_opt(namespace).await?;
139166
if object.is_none() {
140167
break;
141168
}
142-
sleep(Duration::from_millis(args.check_interval_ms)).await;
169+
sleep(Duration::from_millis(args.apply_interval_ms)).await;
143170
}
144171
Ok::<_, Error>(())
145172
})

benchmark/src/ins/elapsed_time.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use anyhow::Result;
22
use async_trait::async_trait;
33
use chrono::Utc;
44
use kube::Client;
5-
use tracing::{instrument, Level};
5+
use tracing::{info, instrument, Level};
66

77
use crate::args::CommonArgs;
88

@@ -18,6 +18,7 @@ impl super::Instruction for Instruction {
1818
#[instrument(skip_all, err(level = Level::ERROR))]
1919
async fn apply(&self, _kube: &Client, _args: &CommonArgs, metrics: &mut Metrics) -> Result<()> {
2020
let Self { label } = self;
21+
info!("elapsed_time: {label}_timestamp_begin");
2122
metrics.write(
2223
format!("{label}_timestamp_begin"),
2324
Utc::now().timestamp_micros(),
@@ -33,6 +34,7 @@ impl super::Instruction for Instruction {
3334
metrics: &mut Metrics,
3435
) -> Result<()> {
3536
let Self { label } = self;
37+
info!("elapsed_time: {label}_timestamp_end");
3638
metrics.write(
3739
format!("{label}_timestamp_end"),
3840
Utc::now().timestamp_micros(),

benchmark/src/ins/static_metric.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use anyhow::Result;
22
use async_trait::async_trait;
33
use kube::Client;
44
use serde_json::Value;
5-
use tracing::{instrument, Level};
5+
use tracing::{info, instrument, Level};
66

77
use crate::args::CommonArgs;
88

@@ -19,6 +19,7 @@ impl super::Instruction for Instruction {
1919
#[instrument(skip_all, err(level = Level::ERROR))]
2020
async fn apply(&self, _kube: &Client, _args: &CommonArgs, metrics: &mut Metrics) -> Result<()> {
2121
let Self { key, value } = self.clone();
22+
info!("static_metric: {key}={value:?}");
2223
metrics.write(key.into(), value);
2324
Ok(())
2425
}

0 commit comments

Comments
 (0)