Skip to content

Commit b66b050

Browse files
committed
time: delay the Arc::clone until registering timer
There are two usage of this handle for timer: 1. Ensure the time driver is enabled. 2. Registering or clear the entry from the global wheel. For (1), we just need the `&Handle`, no need to make a clone. For (2), we can delay the `.clone()` until we are about to register the entry. Delaying the `Arc::clone` improves the performance on multi-core machine. Signed-off-by: ADD-SP <[email protected]>
1 parent 0d234c3 commit b66b050

File tree

6 files changed

+119
-58
lines changed

6 files changed

+119
-58
lines changed

tokio/src/runtime/driver.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ impl Handle {
110110
pub(crate) fn time(&self) -> &crate::runtime::time::Handle {
111111
self.time
112112
.as_ref()
113-
.expect("A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers.")
113+
.expect(crate::util::error::TIME_DISABLED_ERROR)
114114
}
115115

116116
pub(crate) fn clock(&self) -> &Clock {

tokio/src/runtime/scheduler/mod.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,24 @@ cfg_rt! {
9494
}
9595
}
9696

97+
/// # Panics
98+
///
99+
/// Panics if the current [`Context`] is not available
100+
/// in the current thread.
101+
// remove this `allow(dead_code)` when this method
102+
// is used by other other modules except the `time`.
103+
#[cfg_attr(not(feature = "time"), allow(dead_code))]
104+
#[track_caller]
105+
pub(crate) fn with_current<F, R>(f: F) -> R
106+
where
107+
F: FnOnce(&Handle) -> R,
108+
{
109+
match context::with_current(|hdl| f(hdl)) {
110+
Ok(ret) => ret,
111+
Err(e) => panic!("{e}"),
112+
}
113+
}
114+
97115
pub(crate) fn blocking_spawner(&self) -> &blocking::Spawner {
98116
match_flavor!(self, Handle(h) => &h.blocking_spawner)
99117
}
@@ -268,8 +286,19 @@ cfg_not_rt! {
268286
))]
269287
impl Handle {
270288
#[track_caller]
289+
#[cfg_attr(feature = "time", allow(dead_code))]
271290
pub(crate) fn current() -> Handle {
272291
panic!("{}", crate::util::error::CONTEXT_MISSING_ERROR)
273292
}
293+
294+
cfg_time! {
295+
#[track_caller]
296+
pub(crate) fn with_current<F, R>(_f: F) -> R
297+
where
298+
F: FnOnce(&Handle) -> R,
299+
{
300+
panic!("{}", crate::util::error::CONTEXT_MISSING_ERROR)
301+
}
302+
}
274303
}
275304
}

tokio/src/runtime/time/entry.rs

Lines changed: 34 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,10 @@ use crate::loom::sync::atomic::AtomicU64;
5959
use crate::loom::sync::atomic::Ordering;
6060

6161
use crate::runtime::scheduler;
62+
use crate::runtime::time;
6263
use crate::sync::AtomicWaker;
6364
use crate::time::Instant;
65+
use crate::util::error::{RUNTIME_SHUTTING_DOWN_ERROR, TIME_DISABLED_ERROR};
6466
use crate::util::linked_list;
6567

6668
use pin_project_lite::pin_project;
@@ -285,9 +287,6 @@ pin_project! {
285287
// before polling.
286288
#[derive(Debug)]
287289
pub(crate) struct TimerEntry {
288-
// Arc reference to the runtime handle. We can only free the driver after
289-
// deregistering everything from their respective timer wheels.
290-
driver: scheduler::Handle,
291290
// Shared inner structure; this is part of an intrusive linked list, and
292291
// therefore other references can exist to it while mutable references to
293292
// Entry exist.
@@ -340,6 +339,10 @@ pub(crate) struct TimerShared {
340339
/// Only accessed under the entry lock.
341340
pointers: linked_list::Pointers<TimerShared>,
342341

342+
// Arc reference to the runtime handle. We can only free the driver after
343+
// deregistering everything from their respective timer wheels.
344+
driver: scheduler::Handle,
345+
343346
/// The time when the [`TimerEntry`] was registered into the Wheel,
344347
/// [`STATE_DEREGISTERED`] means it is not registered.
345348
///
@@ -384,11 +387,19 @@ generate_addr_of_methods! {
384387

385388
impl TimerShared {
386389
pub(super) fn new() -> Self {
387-
Self {
388-
registered_when: AtomicU64::new(0),
389-
pointers: linked_list::Pointers::new(),
390-
state: StateCell::default(),
391-
_p: PhantomPinned,
390+
// ensure both scheduler handle and time driver are available,
391+
// otherwise panic
392+
let maybe_hdl =
393+
scheduler::Handle::with_current(|hdl| hdl.driver().time.as_ref().map(|_| hdl.clone()));
394+
match maybe_hdl {
395+
Some(hdl) => Self {
396+
driver: hdl,
397+
registered_when: AtomicU64::new(0),
398+
pointers: linked_list::Pointers::new(),
399+
state: StateCell::default(),
400+
_p: PhantomPinned,
401+
},
402+
None => panic!("{TIME_DISABLED_ERROR}"),
392403
}
393404
}
394405

@@ -453,6 +464,10 @@ impl TimerShared {
453464
pub(super) fn might_be_registered(&self) -> bool {
454465
self.state.might_be_registered()
455466
}
467+
468+
fn driver(&self) -> &time::Handle {
469+
self.driver.driver().time()
470+
}
456471
}
457472

458473
unsafe impl linked_list::Link for TimerShared {
@@ -479,12 +494,8 @@ unsafe impl linked_list::Link for TimerShared {
479494

480495
impl TimerEntry {
481496
#[track_caller]
482-
pub(crate) fn new(handle: scheduler::Handle, deadline: Instant) -> Self {
483-
// Panic if the time driver is not enabled
484-
let _ = handle.driver().time();
485-
497+
pub(crate) fn new(deadline: Instant) -> Self {
486498
Self {
487-
driver: handle,
488499
inner: None,
489500
deadline,
490501
registered: false,
@@ -565,15 +576,14 @@ impl TimerEntry {
565576
// driver did so far and happens-before everything the driver does in
566577
// the future. While we have the lock held, we also go ahead and
567578
// deregister the entry if necessary.
568-
unsafe { self.driver().clear_entry(NonNull::from(inner)) };
579+
unsafe { inner.driver().clear_entry(NonNull::from(inner)) };
569580
}
570581

571582
pub(crate) fn reset(mut self: Pin<&mut Self>, new_time: Instant, reregister: bool) {
572583
let this = self.as_mut().project();
573584
*this.deadline = new_time;
574585
*this.registered = reregister;
575586

576-
let tick = self.driver().time_source().deadline_to_tick(new_time);
577587
let inner = match self.inner() {
578588
Some(inner) => inner,
579589
None => {
@@ -582,15 +592,17 @@ impl TimerEntry {
582592
.expect("inner should already be initialized by `this.init_inner()`")
583593
}
584594
};
595+
let tick = inner.driver().time_source().deadline_to_tick(new_time);
585596

586597
if inner.extend_expiration(tick).is_ok() {
587598
return;
588599
}
589600

590601
if reregister {
591602
unsafe {
592-
self.driver()
593-
.reregister(&self.driver.driver().io, tick, inner.into());
603+
inner
604+
.driver()
605+
.reregister(&inner.driver.driver().io, tick, inner.into());
594606
}
595607
}
596608
}
@@ -599,12 +611,6 @@ impl TimerEntry {
599611
mut self: Pin<&mut Self>,
600612
cx: &mut Context<'_>,
601613
) -> Poll<Result<(), super::Error>> {
602-
assert!(
603-
!self.driver().is_shutdown(),
604-
"{}",
605-
crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR
606-
);
607-
608614
if !self.registered {
609615
let deadline = self.deadline;
610616
self.as_mut().reset(deadline, true);
@@ -613,16 +619,13 @@ impl TimerEntry {
613619
let inner = self
614620
.inner()
615621
.expect("inner should already be initialized by `self.reset()`");
616-
inner.state.poll(cx.waker())
617-
}
618622

619-
pub(crate) fn driver(&self) -> &super::Handle {
620-
self.driver.driver().time()
621-
}
623+
assert!(
624+
!inner.driver().is_shutdown(),
625+
"{RUNTIME_SHUTTING_DOWN_ERROR}"
626+
);
622627

623-
#[cfg(all(tokio_unstable, feature = "tracing"))]
624-
pub(crate) fn clock(&self) -> &super::Clock {
625-
self.driver.driver().clock()
628+
inner.state.poll(cx.waker())
626629
}
627630
}
628631

tokio/src/runtime/time/tests/mod.rs

Lines changed: 37 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#![cfg(not(target_os = "wasi"))]
22

33
use std::{task::Context, time::Duration};
4+
use std::future::poll_fn;
45

56
#[cfg(not(loom))]
67
use futures::task::noop_waker_ref;
@@ -45,16 +46,20 @@ fn single_timer() {
4546
model(|| {
4647
let rt = rt(false);
4748
let handle = rt.handle();
49+
let handle_clone = handle.clone();
4850

49-
let handle_ = handle.clone();
5051
let jh = thread::spawn(move || {
52+
let _guard = handle_clone.enter();
53+
let clock = handle_clone.inner.driver().clock();
5154
let entry = TimerEntry::new(
52-
handle_.inner.clone(),
53-
handle_.inner.driver().clock().now() + Duration::from_secs(1),
55+
clock.now() + Duration::from_secs(1),
5456
);
5557
pin!(entry);
5658

57-
block_on(std::future::poll_fn(|cx| entry.as_mut().poll_elapsed(cx))).unwrap();
59+
block_on(poll_fn(|cx| {
60+
let _guard = handle_clone.enter();
61+
entry.as_mut().poll_elapsed(cx)
62+
})).unwrap();
5863
});
5964

6065
thread::yield_now();
@@ -74,12 +79,14 @@ fn drop_timer() {
7479
model(|| {
7580
let rt = rt(false);
7681
let handle = rt.handle();
82+
let handle_clone = handle.clone();
7783

78-
let handle_ = handle.clone();
7984
let jh = thread::spawn(move || {
85+
let _guard = handle_clone.enter();
86+
87+
let clock = handle_clone.inner.driver().clock();
8088
let entry = TimerEntry::new(
81-
handle_.inner.clone(),
82-
handle_.inner.driver().clock().now() + Duration::from_secs(1),
89+
clock.now() + Duration::from_secs(1),
8390
);
8491
pin!(entry);
8592

@@ -108,20 +115,25 @@ fn change_waker() {
108115
model(|| {
109116
let rt = rt(false);
110117
let handle = rt.handle();
118+
let handle_clone = handle.clone();
111119

112-
let handle_ = handle.clone();
113120
let jh = thread::spawn(move || {
121+
let _guard = handle_clone.enter();
122+
123+
let clock = handle_clone.inner.driver().clock();
114124
let entry = TimerEntry::new(
115-
handle_.inner.clone(),
116-
handle_.inner.driver().clock().now() + Duration::from_secs(1),
125+
clock.now() + Duration::from_secs(1),
117126
);
118127
pin!(entry);
119128

120129
let _ = entry
121130
.as_mut()
122131
.poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref()));
123132

124-
block_on(std::future::poll_fn(|cx| entry.as_mut().poll_elapsed(cx))).unwrap();
133+
block_on(poll_fn(|cx| {
134+
let _guard = handle_clone.enter();
135+
entry.as_mut().poll_elapsed(cx)
136+
})).unwrap();
125137
});
126138

127139
thread::yield_now();
@@ -143,13 +155,15 @@ fn reset_future() {
143155

144156
let rt = rt(false);
145157
let handle = rt.handle();
158+
let handle_clone = handle.clone();
146159

147-
let handle_ = handle.clone();
148160
let finished_early_ = finished_early.clone();
149161
let start = handle.inner.driver().clock().now();
150162

151163
let jh = thread::spawn(move || {
152-
let entry = TimerEntry::new(handle_.inner.clone(), start + Duration::from_secs(1));
164+
let _guard = handle_clone.enter();
165+
166+
let entry = TimerEntry::new(start + Duration::from_secs(1));
153167
pin!(entry);
154168

155169
let _ = entry
@@ -159,7 +173,10 @@ fn reset_future() {
159173
entry.as_mut().reset(start + Duration::from_secs(2), true);
160174

161175
// shouldn't complete before 2s
162-
block_on(std::future::poll_fn(|cx| entry.as_mut().poll_elapsed(cx))).unwrap();
176+
block_on(poll_fn(|cx| {
177+
let _guard = handle_clone.enter();
178+
entry.as_mut().poll_elapsed(cx)
179+
})).unwrap();
163180

164181
finished_early_.store(true, Ordering::Relaxed);
165182
});
@@ -202,13 +219,14 @@ fn normal_or_miri<T>(normal: T, miri: T) -> T {
202219
fn poll_process_levels() {
203220
let rt = rt(true);
204221
let handle = rt.handle();
222+
let clock = handle.inner.driver().clock();
223+
let _guard = handle.enter();
205224

206225
let mut entries = vec![];
207226

208227
for i in 0..normal_or_miri(1024, 64) {
209228
let mut entry = Box::pin(TimerEntry::new(
210-
handle.inner.clone(),
211-
handle.inner.driver().clock().now() + Duration::from_millis(i),
229+
clock.now() + Duration::from_millis(i),
212230
));
213231

214232
let _ = entry
@@ -239,10 +257,11 @@ fn poll_process_levels_targeted() {
239257

240258
let rt = rt(true);
241259
let handle = rt.handle();
260+
let clock = handle.inner.driver().clock();
261+
let _guard = handle.enter();
242262

243263
let e1 = TimerEntry::new(
244-
handle.inner.clone(),
245-
handle.inner.driver().clock().now() + Duration::from_millis(193),
264+
clock.now() + Duration::from_millis(193),
246265
);
247266
pin!(e1);
248267

tokio/src/time/sleep.rs

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
use crate::runtime::time::TimerEntry;
22
use crate::time::{error::Error, Duration, Instant};
3+
use crate::util::error::TIME_DISABLED_ERROR;
34
use crate::util::trace;
45

6+
use crate::runtime::scheduler;
57
use pin_project_lite::pin_project;
68
use std::future::Future;
79
use std::panic::Location;
@@ -251,9 +253,11 @@ impl Sleep {
251253
deadline: Instant,
252254
location: Option<&'static Location<'static>>,
253255
) -> Sleep {
254-
use crate::runtime::scheduler;
255-
let handle = scheduler::Handle::current();
256-
let entry = TimerEntry::new(handle, deadline);
256+
// ensure both scheduler handle and time driver are available,
257+
// otherwise panic
258+
let is_time_enabled = scheduler::Handle::with_current(|hdl| hdl.driver().time.is_some());
259+
assert!(is_time_enabled, "{TIME_DISABLED_ERROR}");
260+
let entry = TimerEntry::new(deadline);
257261
#[cfg(all(tokio_unstable, feature = "tracing"))]
258262
let inner = {
259263
let handle = scheduler::Handle::current();
@@ -380,11 +384,14 @@ impl Sleep {
380384
tracing::trace_span!("runtime.resource.async_op.poll");
381385

382386
let duration = {
383-
let clock = me.entry.clock();
384-
let time_source = me.entry.driver().time_source();
385-
let now = time_source.now(clock);
386-
let deadline_tick = time_source.deadline_to_tick(deadline);
387-
deadline_tick.saturating_sub(now)
387+
scheduler::Handle::with_current(|hdl| {
388+
let driver = hdl.driver();
389+
let clock = driver.clock();
390+
let time_source = driver.time().time_source();
391+
let now = time_source.now(clock);
392+
let deadline_tick = time_source.deadline_to_tick(deadline);
393+
deadline_tick.saturating_sub(now)
394+
})
388395
};
389396

390397
tracing::trace!(

0 commit comments

Comments
 (0)