Skip to content

feat(runtime): implement thread affinity for runtime and dispatcher #445

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
34 changes: 28 additions & 6 deletions compio-dispatcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#![warn(missing_docs)]

use std::{
collections::HashSet,
future::Future,
io,
num::NonZeroUsize,
Expand Down Expand Up @@ -71,39 +72,52 @@ pub struct Dispatcher {

impl Dispatcher {
/// Create the dispatcher with specified number of threads.
pub(crate) fn new_impl(mut builder: DispatcherBuilder) -> io::Result<Self> {
let mut proactor_builder = builder.proactor_builder;
pub(crate) fn new_impl(builder: DispatcherBuilder) -> io::Result<Self> {
let DispatcherBuilder {
nthreads,
concurrent,
stack_size,
mut thread_affinity,
mut names,
mut proactor_builder,
} = builder;
proactor_builder.force_reuse_thread_pool();
let pool = proactor_builder.create_or_get_thread_pool();
let (sender, receiver) = unbounded::<Spawning>();

let threads = (0..builder.nthreads)
let threads = (0..nthreads)
.map({
|index| {
let proactor_builder = proactor_builder.clone();
let receiver = receiver.clone();

let thread_builder = std::thread::Builder::new();
let thread_builder = if let Some(s) = builder.stack_size {
let thread_builder = if let Some(s) = stack_size {
thread_builder.stack_size(s)
} else {
thread_builder
};
let thread_builder = if let Some(f) = &mut builder.names {
let thread_builder = if let Some(f) = &mut names {
thread_builder.name(f(index))
} else {
thread_builder
};

let cpus = if let Some(f) = &mut thread_affinity {
f(index)
} else {
HashSet::new()
};
thread_builder.spawn(move || {
Runtime::builder()
.with_proactor(proactor_builder)
.thread_affinity(cpus)
.build()
.expect("cannot create compio runtime")
.block_on(async move {
while let Ok(f) = receiver.recv_async().await {
let task = Runtime::with_current(|rt| f.spawn(rt));
if builder.concurrent {
if concurrent {
task.detach()
} else {
task.await.ok();
Expand Down Expand Up @@ -218,6 +232,7 @@ pub struct DispatcherBuilder {
nthreads: usize,
concurrent: bool,
stack_size: Option<usize>,
thread_affinity: Option<Box<dyn FnMut(usize) -> HashSet<usize>>>,
names: Option<Box<dyn FnMut(usize) -> String>>,
proactor_builder: ProactorBuilder,
}
Expand All @@ -229,6 +244,7 @@ impl DispatcherBuilder {
nthreads: available_parallelism().map(|n| n.get()).unwrap_or(1),
concurrent: true,
stack_size: None,
thread_affinity: None,
names: None,
proactor_builder: ProactorBuilder::new(),
}
Expand Down Expand Up @@ -257,6 +273,12 @@ impl DispatcherBuilder {
self
}

/// Set the thread affinity for the dispatcher.
pub fn thread_affinity(mut self, f: impl FnMut(usize) -> HashSet<usize> + 'static) -> Self {
self.thread_affinity = Some(Box::new(f));
self
}

/// Provide a function to assign names to the worker threads.
pub fn thread_names(mut self, f: impl (FnMut(usize) -> String) + 'static) -> Self {
self.names = Some(Box::new(f) as _);
Expand Down
1 change: 1 addition & 0 deletions compio-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ compio-log = { workspace = true }
async-task = "4.5.0"
cfg-if = { workspace = true, optional = true }
criterion = { workspace = true, optional = true }
core_affinity = "0.8.3"
crossbeam-queue = { workspace = true }
futures-util = { workspace = true }
once_cell = { workspace = true }
Expand Down
33 changes: 33 additions & 0 deletions compio-runtime/src/affinity.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use std::collections::HashSet;

use compio_log::*;
use core_affinity::CoreId;

/// Bind current thread to given cpus
pub fn bind_to_cpu_set(cpus: &HashSet<usize>) {
if cpus.is_empty() {
return;
}

let Some(ids) = core_affinity::get_core_ids() else {
return;
};

let ids = ids
.into_iter()
.map(|core_id| core_id.id)
.collect::<HashSet<_>>();
match (ids.iter().max(), cpus.iter().max()) {
(Some(max_id), Some(max_cpu)) if *max_cpu > *max_id => {
error!("CPU ID: {max_cpu} exceeds maximum available CPU ID: {max_id}");
}
_ => {}
}
let cpu_set = ids.intersection(cpus);
for cpu in cpu_set {
let result = core_affinity::set_for_current(CoreId { id: *cpu });
if !result {
warn!("cannot set CPU {cpu} for current thread");
}
}
}
1 change: 1 addition & 0 deletions compio-runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
#![warn(missing_docs)]

mod affinity;
mod attacher;
mod runtime;

Expand Down
24 changes: 20 additions & 4 deletions compio-runtime/src/runtime/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{
any::Any,
cell::{Cell, RefCell},
collections::VecDeque,
collections::{HashSet, VecDeque},
future::{Future, ready},
io,
marker::PhantomData,
Expand Down Expand Up @@ -33,7 +33,7 @@ use send_wrapper::SendWrapper;

#[cfg(feature = "time")]
use crate::runtime::time::{TimerFuture, TimerRuntime};
use crate::{BufResult, runtime::op::OpFuture};
use crate::{BufResult, affinity::bind_to_cpu_set, runtime::op::OpFuture};

scoped_tls::scoped_thread_local!(static CURRENT_RUNTIME: Runtime);

Expand Down Expand Up @@ -125,14 +125,22 @@ impl Runtime {
}

fn with_builder(builder: &RuntimeBuilder) -> io::Result<Self> {
let RuntimeBuilder {
proactor_builder,
thread_affinity,
event_interval,
} = builder;
let id = RUNTIME_ID.get();
RUNTIME_ID.set(id + 1);
if !thread_affinity.is_empty() {
bind_to_cpu_set(thread_affinity);
}
Ok(Self {
driver: RefCell::new(builder.proactor_builder.build()?),
driver: RefCell::new(proactor_builder.build()?),
runnables: Arc::new(RunnableQueue::new()),
#[cfg(feature = "time")]
timer_runtime: RefCell::new(TimerRuntime::new()),
event_interval: builder.event_interval,
event_interval: *event_interval,
id,
_p: PhantomData,
})
Expand Down Expand Up @@ -433,6 +441,7 @@ impl criterion::async_executor::AsyncExecutor for &Runtime {
#[derive(Debug, Clone)]
pub struct RuntimeBuilder {
proactor_builder: ProactorBuilder,
thread_affinity: HashSet<usize>,
event_interval: usize,
}

Expand All @@ -448,6 +457,7 @@ impl RuntimeBuilder {
Self {
proactor_builder: ProactorBuilder::new(),
event_interval: 61,
thread_affinity: HashSet::new(),
}
}

Expand All @@ -457,6 +467,12 @@ impl RuntimeBuilder {
self
}

/// Sets the thread affinity for the runtime.
pub fn thread_affinity(&mut self, cpus: HashSet<usize>) -> &mut Self {
self.thread_affinity = cpus;
self
}

/// Sets the number of scheduler ticks after which the scheduler will poll
/// for external events (timers, I/O, and so on).
///
Expand Down
Loading