Skip to content

Commit 1e996da

Browse files
committed
feat(benchmark): begin implementation of sync_datasets
1 parent 4706a35 commit 1e996da

16 files changed

+534
-149
lines changed

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ Cargo.lock
2727
**/*.csv
2828
**/cache
2929
**/dst
30-
**/outputs
30+
**/outputs*
3131
/logs
3232
/rootfs
3333
/test

benchmark/src/args.rs

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,8 @@ impl Args {
2323
pub(super) async fn execute(self) -> Result<()> {
2424
let mut stack = InstructionStack::try_new(self.common).await?;
2525

26-
let mut error = None;
27-
for ins in self.command.to_instructions().await? {
28-
match stack.push(ins).await {
29-
Ok(()) => continue,
30-
Err(e) => {
31-
error = Some(e);
32-
break;
33-
}
34-
}
35-
}
26+
let prog = self.command.to_instructions().await?;
27+
let result = stack.run(prog).await;
3628

3729
let value = stack.cleanup().await?;
3830
let path = {
@@ -45,10 +37,7 @@ impl Args {
4537
};
4638
fs::write(&path, ::serde_json::to_string_pretty(&value)?).await?;
4739

48-
match error {
49-
None => Ok(()),
50-
Some(e) => Err(e),
51-
}
40+
result
5241
}
5342
}
5443

benchmark/src/command/create.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,11 @@ impl CreateArgs {
3838
key: "num_ponds",
3939
value: Value::Number(1usize.into()),
4040
}),
41-
Box::new(ins::create_ponds::Instruction { num_k: 1 }),
41+
Box::new(ins::create_ponds::Instruction {
42+
address: None,
43+
name: None,
44+
num_k: 1,
45+
}),
4246
Box::new(ins::elapsed_time::Instruction {
4347
label: "create_datasets",
4448
}),
@@ -56,7 +60,11 @@ impl CreateArgs {
5660
Box::new(ins::elapsed_time::Instruction {
5761
label: "create_ponds",
5862
}),
59-
Box::new(ins::create_ponds::Instruction { num_k }),
63+
Box::new(ins::create_ponds::Instruction {
64+
address: None,
65+
name: None,
66+
num_k,
67+
}),
6068
]),
6169
}
6270
}

benchmark/src/command/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
pub mod create;
2+
pub mod sync;
23

34
use anyhow::Result;
45
use clap::Subcommand;
@@ -8,12 +9,14 @@ use crate::ins::Instruction;
89
#[derive(Clone, Debug, PartialEq, Subcommand)]
910
pub enum Command {
1011
Create(self::create::CreateArgs),
12+
Sync(self::sync::SyncArgs),
1113
}
1214

1315
impl Command {
1416
pub(super) async fn to_instructions(self) -> Result<Vec<Box<dyn Instruction>>> {
1517
match self {
1618
Self::Create(args) => args.to_instructions().await,
19+
Self::Sync(args) => args.to_instructions().await,
1720
}
1821
}
1922
}

benchmark/src/command/sync.rs

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
use std::net::Ipv4Addr;
2+
3+
use anyhow::Result;
4+
use clap::Parser;
5+
use serde_json::Value;
6+
use tracing::instrument;
7+
8+
use crate::ins::{self, Instruction};
9+
10+
#[derive(Clone, Debug, PartialEq, Parser)]
11+
pub struct SyncArgs {
12+
#[arg(long, default_value_t = 1)]
13+
pub num_k: usize,
14+
15+
#[arg(long, env = "SOURCE_BUCKET_ADDRESS")]
16+
pub source_bucket_address: Ipv4Addr,
17+
}
18+
19+
impl SyncArgs {
20+
#[instrument(skip_all)]
21+
pub(super) async fn to_instructions(self) -> Result<Vec<Box<dyn Instruction>>> {
22+
let Self {
23+
num_k,
24+
source_bucket_address,
25+
} = self;
26+
27+
Ok(vec![
28+
Box::new(ins::static_metric::Instruction {
29+
key: "kind",
30+
value: Value::String("sync_datasets".into()),
31+
}),
32+
Box::new(ins::static_metric::Instruction {
33+
key: "num_datasets",
34+
value: Value::Number(num_k.into()),
35+
}),
36+
Box::new(ins::static_metric::Instruction {
37+
key: "num_ponds",
38+
value: Value::Number(1usize.into()),
39+
}),
40+
Box::new(ins::checkout_context::Instruction::new(
41+
42+
)),
43+
Box::new(ins::create_ponds::Instruction {
44+
address: None,
45+
name: None,
46+
num_k: 1,
47+
}),
48+
Box::new(ins::create_datasets::Instruction { num_k }),
49+
Box::new(ins::checkout_context::Instruction::new(
50+
51+
)),
52+
Box::new(ins::create_ponds::Instruction {
53+
address: None,
54+
name: None,
55+
num_k: 1,
56+
}),
57+
Box::new(ins::create_ponds::Instruction {
58+
address: Some(source_bucket_address),
59+
name: Some("remote".into()),
60+
num_k: 1,
61+
}),
62+
Box::new(ins::elapsed_time::Instruction {
63+
label: "sync_datasets",
64+
}),
65+
Box::new(ins::create_syncs::Instruction { num_k }),
66+
])
67+
}
68+
}

benchmark/src/ins/branch.rs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
use anyhow::Result;
2+
use async_trait::async_trait;
3+
use tokio::sync::Mutex;
4+
use tracing::{info, instrument, Level};
5+
6+
use super::InstructionStack;
7+
8+
pub struct Instruction {
9+
pub prog: Vec<Box<dyn super::Instruction>>,
10+
stack: Mutex<Option<InstructionStack>>,
11+
}
12+
13+
impl Instruction {
14+
pub fn new(prog: Vec<Box<dyn super::Instruction>>) -> Self {
15+
Self {
16+
prog,
17+
stack: Default::default(),
18+
}
19+
}
20+
}
21+
22+
#[async_trait]
23+
impl super::Instruction for Instruction {
24+
#[instrument(skip_all, err(level = Level::ERROR))]
25+
async fn apply(&self, parent: &mut InstructionStack) -> Result<()> {
26+
let Self { prog, stack } = self;
27+
let mut stack = stack.lock().await;
28+
stack.replace(parent.branch().await);
29+
let stack = stack.as_mut().unwrap();
30+
info!("branch");
31+
for ins in prog {
32+
ins.apply(stack).await?;
33+
}
34+
Ok(())
35+
}
36+
37+
#[instrument(skip_all, err(level = Level::ERROR))]
38+
async fn delete(&self, _parent: &mut InstructionStack) -> Result<()> {
39+
let Self { prog, stack } = self;
40+
let mut stack = match stack.lock().await.take() {
41+
Some(stack) => stack,
42+
None => return Ok(()),
43+
};
44+
for ins in prog.iter().rev() {
45+
ins.delete(&mut stack).await?;
46+
}
47+
Ok(())
48+
}
49+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
use std::mem;
2+
3+
use anyhow::Result;
4+
use async_trait::async_trait;
5+
use kube::{config::KubeConfigOptions, Client, Config};
6+
use tokio::sync::Mutex;
7+
use tracing::{info, instrument, Level};
8+
9+
use super::InstructionStack;
10+
11+
pub struct Instruction {
12+
name: String,
13+
last: Mutex<Option<Client>>,
14+
}
15+
16+
impl Instruction {
17+
pub fn new(name: String) -> Self {
18+
Self {
19+
name,
20+
last: Default::default(),
21+
}
22+
}
23+
}
24+
25+
#[async_trait]
26+
impl super::Instruction for Instruction {
27+
#[instrument(skip_all, err(level = Level::ERROR))]
28+
async fn apply(&self, stack: &mut InstructionStack) -> Result<()> {
29+
let Self { name, last } = self;
30+
info!("checkout_context: {name}");
31+
32+
let options = KubeConfigOptions {
33+
context: Some(name.clone()),
34+
..Default::default()
35+
};
36+
let config = Config::from_kubeconfig(&options).await?;
37+
let mut kube = Client::try_from(config)?;
38+
mem::swap(&mut kube, &mut stack.kube);
39+
last.lock().await.replace(kube);
40+
Ok(())
41+
}
42+
43+
#[instrument(skip_all, err(level = Level::ERROR))]
44+
async fn delete(&self, stack: &mut InstructionStack) -> Result<()> {
45+
let Self { last, .. } = self;
46+
let kube = match last.lock().await.take() {
47+
Some(kube) => kube,
48+
None => return Ok(()),
49+
};
50+
stack.kube = kube;
51+
Ok(())
52+
}
53+
}

benchmark/src/ins/create_datasets.rs

Lines changed: 54 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use tracing::{info, instrument, Level};
2424

2525
use crate::args::CommonArgs;
2626

27-
use super::Metrics;
27+
use super::InstructionStack;
2828

2929
#[derive(Copy, Clone, Debug)]
3030
pub struct Instruction {
@@ -34,8 +34,9 @@ pub struct Instruction {
3434
#[async_trait]
3535
impl super::Instruction for Instruction {
3636
#[instrument(skip_all, err(level = Level::ERROR))]
37-
async fn apply(&self, kube: &Client, args: &CommonArgs, _metrics: &mut Metrics) -> Result<()> {
37+
async fn apply(&self, stack: &mut InstructionStack) -> Result<()> {
3838
let Self { num_k } = *self;
39+
let InstructionStack { kube, args, .. } = stack;
3940
info!("create_datasets: create {num_k}");
4041

4142
let namespaces = {
@@ -109,59 +110,63 @@ impl super::Instruction for Instruction {
109110
}
110111

111112
#[instrument(skip_all, err(level = Level::ERROR))]
112-
async fn delete(&self, kube: &Client, args: &CommonArgs, _metrics: &mut Metrics) -> Result<()> {
113+
async fn delete(&self, stack: &mut InstructionStack) -> Result<()> {
113114
let Self { num_k } = *self;
115+
let InstructionStack { kube, args, .. } = stack;
114116
info!("create_datasets: delete {num_k}");
117+
delete(kube, args).await
118+
}
119+
}
115120

116-
let items: Vec<_> = {
117-
let api = Api::<ModelClaimCrd>::all(kube.clone());
118-
let lp = ListParams {
119-
label_selector: Some("cdl.ulagbulag.io/benchmark=true".into()),
120-
..Default::default()
121-
};
122-
api.list_metadata(&lp)
123-
.await?
124-
.items
125-
.into_iter()
126-
.filter_map(|item| {
127-
let name = item.name_any();
128-
let namespace = item.namespace()?;
129-
Some((namespace, name))
130-
})
131-
.collect()
132-
};
133-
134-
let dp = DeleteParams {
135-
propagation_policy: Some(PropagationPolicy::Foreground),
121+
pub(super) async fn delete(kube: &Client, args: &CommonArgs) -> Result<()> {
122+
let items: Vec<_> = {
123+
let api = Api::<ModelClaimCrd>::all(kube.clone());
124+
let lp = ListParams {
125+
label_selector: Some("cdl.ulagbulag.io/benchmark=true".into()),
136126
..Default::default()
137127
};
138-
items
139-
.iter()
140-
.map(|x| async move { Ok(x) })
141-
.collect::<FuturesUnordered<_>>()
142-
.try_for_each_concurrent(args.num_threads, |(namespace, name)| async {
143-
let api = Api::<ModelClaimCrd>::namespaced(kube.clone(), namespace);
144-
api.delete(name, &dp).await?;
145-
sleep(Duration::from_millis(args.apply_interval_ms)).await;
146-
Ok::<_, Error>(())
128+
api.list_metadata(&lp)
129+
.await?
130+
.items
131+
.into_iter()
132+
.filter_map(|item| {
133+
let name = item.name_any();
134+
let namespace = item.namespace()?;
135+
Some((namespace, name))
147136
})
148-
.await?;
137+
.collect()
138+
};
149139

150-
items
151-
.iter()
152-
.map(|x| async move { Ok(x) })
153-
.collect::<FuturesUnordered<_>>()
154-
.try_for_each_concurrent(args.num_threads, |(namespace, name)| async {
155-
let api = Api::<ModelClaimCrd>::namespaced(kube.clone(), namespace);
156-
loop {
157-
let object = api.get_metadata_opt(name).await?;
158-
if object.is_none() {
159-
break;
160-
}
161-
sleep(Duration::from_millis(args.apply_interval_ms)).await;
140+
let dp = DeleteParams {
141+
propagation_policy: Some(PropagationPolicy::Foreground),
142+
..Default::default()
143+
};
144+
items
145+
.iter()
146+
.map(|x| async move { Ok(x) })
147+
.collect::<FuturesUnordered<_>>()
148+
.try_for_each_concurrent(args.num_threads, |(namespace, name)| async {
149+
let api = Api::<ModelClaimCrd>::namespaced(kube.clone(), namespace);
150+
api.delete(name, &dp).await?;
151+
sleep(Duration::from_millis(args.apply_interval_ms)).await;
152+
Ok::<_, Error>(())
153+
})
154+
.await?;
155+
156+
items
157+
.iter()
158+
.map(|x| async move { Ok(x) })
159+
.collect::<FuturesUnordered<_>>()
160+
.try_for_each_concurrent(args.num_threads, |(namespace, name)| async {
161+
let api = Api::<ModelClaimCrd>::namespaced(kube.clone(), namespace);
162+
loop {
163+
let object = api.get_metadata_opt(name).await?;
164+
if object.is_none() {
165+
break;
162166
}
163-
Ok::<_, Error>(())
164-
})
165-
.await
166-
}
167+
sleep(Duration::from_millis(args.apply_interval_ms)).await;
168+
}
169+
Ok::<_, Error>(())
170+
})
171+
.await
167172
}

0 commit comments

Comments
 (0)