diff --git a/compio-dispatcher/src/lib.rs b/compio-dispatcher/src/lib.rs index 3574b2bb..cfbca861 100644 --- a/compio-dispatcher/src/lib.rs +++ b/compio-dispatcher/src/lib.rs @@ -3,6 +3,7 @@ #![warn(missing_docs)] use std::{ + collections::HashSet, future::Future, io, num::NonZeroUsize, @@ -71,39 +72,52 @@ pub struct Dispatcher { impl Dispatcher { /// Create the dispatcher with specified number of threads. - pub(crate) fn new_impl(mut builder: DispatcherBuilder) -> io::Result { - let mut proactor_builder = builder.proactor_builder; + pub(crate) fn new_impl(builder: DispatcherBuilder) -> io::Result { + let DispatcherBuilder { + nthreads, + concurrent, + stack_size, + mut thread_affinity, + mut names, + mut proactor_builder, + } = builder; proactor_builder.force_reuse_thread_pool(); let pool = proactor_builder.create_or_get_thread_pool(); let (sender, receiver) = unbounded::(); - let threads = (0..builder.nthreads) + let threads = (0..nthreads) .map({ |index| { let proactor_builder = proactor_builder.clone(); let receiver = receiver.clone(); let thread_builder = std::thread::Builder::new(); - let thread_builder = if let Some(s) = builder.stack_size { + let thread_builder = if let Some(s) = stack_size { thread_builder.stack_size(s) } else { thread_builder }; - let thread_builder = if let Some(f) = &mut builder.names { + let thread_builder = if let Some(f) = &mut names { thread_builder.name(f(index)) } else { thread_builder }; + let cpus = if let Some(f) = &mut thread_affinity { + f(index) + } else { + HashSet::new() + }; thread_builder.spawn(move || { Runtime::builder() .with_proactor(proactor_builder) + .thread_affinity(cpus) .build() .expect("cannot create compio runtime") .block_on(async move { while let Ok(f) = receiver.recv_async().await { let task = Runtime::with_current(|rt| f.spawn(rt)); - if builder.concurrent { + if concurrent { task.detach() } else { task.await.ok(); @@ -218,6 +232,7 @@ pub struct DispatcherBuilder { nthreads: usize, concurrent: bool, stack_size: Option, + thread_affinity: Option HashSet>>, names: Option String>>, proactor_builder: ProactorBuilder, } @@ -229,6 +244,7 @@ impl DispatcherBuilder { nthreads: available_parallelism().map(|n| n.get()).unwrap_or(1), concurrent: true, stack_size: None, + thread_affinity: None, names: None, proactor_builder: ProactorBuilder::new(), } @@ -257,6 +273,12 @@ impl DispatcherBuilder { self } + /// Set the thread affinity for the dispatcher. + pub fn thread_affinity(mut self, f: impl FnMut(usize) -> HashSet + 'static) -> Self { + self.thread_affinity = Some(Box::new(f)); + self + } + /// Provide a function to assign names to the worker threads. pub fn thread_names(mut self, f: impl (FnMut(usize) -> String) + 'static) -> Self { self.names = Some(Box::new(f) as _); diff --git a/compio-runtime/Cargo.toml b/compio-runtime/Cargo.toml index 5ab36d70..2f8355e9 100644 --- a/compio-runtime/Cargo.toml +++ b/compio-runtime/Cargo.toml @@ -36,6 +36,7 @@ compio-log = { workspace = true } async-task = "4.5.0" cfg-if = { workspace = true, optional = true } criterion = { workspace = true, optional = true } +core_affinity = "0.8.3" crossbeam-queue = { workspace = true } futures-util = { workspace = true } once_cell = { workspace = true } diff --git a/compio-runtime/src/affinity.rs b/compio-runtime/src/affinity.rs new file mode 100644 index 00000000..a0b08ef4 --- /dev/null +++ b/compio-runtime/src/affinity.rs @@ -0,0 +1,33 @@ +use std::collections::HashSet; + +use compio_log::*; +use core_affinity::CoreId; + +/// Bind current thread to given cpus +pub fn bind_to_cpu_set(cpus: &HashSet) { + if cpus.is_empty() { + return; + } + + let Some(ids) = core_affinity::get_core_ids() else { + return; + }; + + let ids = ids + .into_iter() + .map(|core_id| core_id.id) + .collect::>(); + match (ids.iter().max(), cpus.iter().max()) { + (Some(max_id), Some(max_cpu)) if *max_cpu > *max_id => { + error!("CPU ID: {max_cpu} exceeds maximum available CPU ID: {max_id}"); + } + _ => {} + } + let cpu_set = ids.intersection(cpus); + for cpu in cpu_set { + let result = core_affinity::set_for_current(CoreId { id: *cpu }); + if !result { + warn!("cannot set CPU {cpu} for current thread"); + } + } +} diff --git a/compio-runtime/src/lib.rs b/compio-runtime/src/lib.rs index 7a25b920..a4534ca6 100644 --- a/compio-runtime/src/lib.rs +++ b/compio-runtime/src/lib.rs @@ -11,6 +11,7 @@ #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] #![warn(missing_docs)] +mod affinity; mod attacher; mod runtime; diff --git a/compio-runtime/src/runtime/mod.rs b/compio-runtime/src/runtime/mod.rs index 7832dfc0..cd90bfb6 100644 --- a/compio-runtime/src/runtime/mod.rs +++ b/compio-runtime/src/runtime/mod.rs @@ -1,7 +1,7 @@ use std::{ any::Any, cell::{Cell, RefCell}, - collections::VecDeque, + collections::{HashSet, VecDeque}, future::{Future, ready}, io, marker::PhantomData, @@ -33,7 +33,7 @@ use send_wrapper::SendWrapper; #[cfg(feature = "time")] use crate::runtime::time::{TimerFuture, TimerRuntime}; -use crate::{BufResult, runtime::op::OpFuture}; +use crate::{BufResult, affinity::bind_to_cpu_set, runtime::op::OpFuture}; scoped_tls::scoped_thread_local!(static CURRENT_RUNTIME: Runtime); @@ -125,14 +125,22 @@ impl Runtime { } fn with_builder(builder: &RuntimeBuilder) -> io::Result { + let RuntimeBuilder { + proactor_builder, + thread_affinity, + event_interval, + } = builder; let id = RUNTIME_ID.get(); RUNTIME_ID.set(id + 1); + if !thread_affinity.is_empty() { + bind_to_cpu_set(thread_affinity); + } Ok(Self { - driver: RefCell::new(builder.proactor_builder.build()?), + driver: RefCell::new(proactor_builder.build()?), runnables: Arc::new(RunnableQueue::new()), #[cfg(feature = "time")] timer_runtime: RefCell::new(TimerRuntime::new()), - event_interval: builder.event_interval, + event_interval: *event_interval, id, _p: PhantomData, }) @@ -433,6 +441,7 @@ impl criterion::async_executor::AsyncExecutor for &Runtime { #[derive(Debug, Clone)] pub struct RuntimeBuilder { proactor_builder: ProactorBuilder, + thread_affinity: HashSet, event_interval: usize, } @@ -448,6 +457,7 @@ impl RuntimeBuilder { Self { proactor_builder: ProactorBuilder::new(), event_interval: 61, + thread_affinity: HashSet::new(), } } @@ -457,6 +467,12 @@ impl RuntimeBuilder { self } + /// Sets the thread affinity for the runtime. + pub fn thread_affinity(&mut self, cpus: HashSet) -> &mut Self { + self.thread_affinity = cpus; + self + } + /// Sets the number of scheduler ticks after which the scheduler will poll /// for external events (timers, I/O, and so on). ///