Skip to content

Commit 72aee01

Browse files
authored
actor system: Don't allow worker queues to become too deep, queue jobs in factory instead (risc0#3475)
This makes it so that we don't schedule more than a configurable amount of work (default 3 tasks) on a worker. This is an improvement because it delays commit a task running on a worker until a later point in time at which we can make a potentially better scheduling decision. This goes about implementing this by doing the following things: - RPC system `ask` function takes async callback - Actor code ReplySender `send` function becomes async function - Add actor system `ask_callback` function that takes an async callback that gets called with the reply - This allows a reply to become a `tell` function call on another actor - Factory no longer blocks messages waiting for a response to `ScheduleTask` - If the allocator was to only schedule a task much later in time, its important to not block all factory messages until then - Allocator only schedules a task once there is a worker available that is under the task threshold. - new allocator configuration for worker threshold (default to 3 for now) - A new trace span is added for tasks that measures the time it spends in the factory waiting for a worker - this keeps there being no major period of time unaccounted for with tasks
1 parent a83d7bb commit 72aee01

File tree

12 files changed

+805
-189
lines changed

12 files changed

+805
-189
lines changed

risc0/r0vm/src/actors/actor.rs

Lines changed: 106 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,24 @@ impl<ActorT: Actor> ActorRunner<ActorT> {
129129
}
130130
}
131131

132+
/// Called when an actor handles a message
133+
async fn actor_msg<ActorT, MessageT>(
134+
actor: &mut ActorT,
135+
actor_ref: ActorRef<ActorT>,
136+
reply_sender: Option<ReplySender<<ActorT as Message<MessageT>>::Reply>>,
137+
msg: MessageT,
138+
) where
139+
ActorT: Actor + Message<MessageT>,
140+
{
141+
let mut context = Context {
142+
actor_ref,
143+
reply_sender,
144+
};
145+
Message::handle(actor, msg, &mut context).await;
146+
147+
context.maybe_send_no_reply().await;
148+
}
149+
132150
/// Spawn a new task which receives messages for the actor.
133151
pub fn spawn<ActorT: Actor>(actor: ActorT) -> ActorRef<ActorT> {
134152
let (actor_ref, actor_runner) = ActorController::run(actor);
@@ -253,6 +271,13 @@ impl<ActorT: Actor> ActorController<ActorT> {
253271
fn stop_waiter(&self) -> broadcast::Receiver<()> {
254272
self.stop.resubscribe()
255273
}
274+
275+
fn stop_reason(&self) -> Option<String> {
276+
match &self.sender {
277+
SenderState::Stopped { stop_reason } => Some(stop_reason.clone()),
278+
_ => None,
279+
}
280+
}
256281
}
257282

258283
pub struct ActorRef<ActorT: Actor> {
@@ -302,6 +327,9 @@ impl<ActorT: Actor> ActorRef<ActorT> {
302327
}
303328

304329
/// Send a message to an actor and return an object which can be used to wait for a reply.
330+
/// If the remote actor panic handling the message, the `PendingReply` will return
331+
/// `Err(SendError::ActorNotRunning)`. If the remote actor doesn't reply, it will return
332+
/// `Err(SendError::NoReply)`.
305333
pub async fn ask_enqueue<MessageT>(
306334
&self,
307335
msg: MessageT,
@@ -310,23 +338,39 @@ impl<ActorT: Actor> ActorRef<ActorT> {
310338
MessageT: Send + 'static,
311339
ActorT: Message<MessageT>,
312340
{
313-
let actor_ref = self.clone();
314341
let (sender, reply) = oneshot::channel();
315-
let reply_sender = ReplySender { sender };
342+
self.ask_callback(msg, async |res| {
343+
let _ = sender.send(res);
344+
})
345+
.await?;
346+
Ok(PendingReply { reply })
347+
}
348+
349+
/// Send a message to an actor and call a callback with the response.
350+
/// If the remote actor panics handling the message the callback isn't called. If the remote
351+
/// actor doesn't reply, the callback is called with `Err(SendError::NoReply)`.
352+
pub async fn ask_callback<MessageT, FutT>(
353+
&self,
354+
msg: MessageT,
355+
callback: impl FnOnce(Result<<ActorT as Message<MessageT>>::Reply, SendError>) -> FutT
356+
+ Send
357+
+ 'static,
358+
) -> Result<(), SendError>
359+
where
360+
MessageT: Send + 'static,
361+
ActorT: Message<MessageT>,
362+
FutT: Future<Output = ()> + Send + 'static,
363+
{
364+
let actor_ref = self.clone();
365+
let reply_sender = ReplySender::new(callback);
316366

317367
let sender = self.controller.lock().unwrap().sender()?;
318368
sender
319369
.send(Box::new(move |actor| {
320-
Box::pin(async move {
321-
let mut context = Context {
322-
actor_ref,
323-
reply_sender: Some(reply_sender),
324-
};
325-
Message::handle(actor, msg, &mut context).await;
326-
})
370+
Box::pin(actor_msg(actor, actor_ref, Some(reply_sender), msg))
327371
}))
328372
.await?;
329-
Ok(PendingReply { reply })
373+
Ok(())
330374
}
331375

332376
/// Send a message to an actor and don't wait for a reply.
@@ -339,13 +383,7 @@ impl<ActorT: Actor> ActorRef<ActorT> {
339383
let sender = self.controller.lock().unwrap().sender()?;
340384
sender
341385
.send(Box::new(move |actor| {
342-
Box::pin(async move {
343-
let mut context = Context {
344-
actor_ref,
345-
reply_sender: None,
346-
};
347-
Message::handle(actor, msg, &mut context).await;
348-
})
386+
Box::pin(actor_msg(actor, actor_ref, None, msg))
349387
}))
350388
.await
351389
}
@@ -376,13 +414,7 @@ impl<ActorT: Actor> ActorRef<ActorT> {
376414
let actor_ref = self.clone();
377415
let sender = self.controller.lock().unwrap().sender()?;
378416
sender.blocking_send(Box::new(move |actor| {
379-
Box::pin(async move {
380-
let mut context = Context {
381-
actor_ref,
382-
reply_sender: None,
383-
};
384-
Message::handle(actor, msg, &mut context).await;
385-
})
417+
Box::pin(actor_msg(actor, actor_ref, None, msg))
386418
}))
387419
}
388420

@@ -399,13 +431,7 @@ impl<ActorT: Actor> ActorRef<ActorT> {
399431
let sender = self.controller.lock().unwrap().sender()?;
400432
sender
401433
.send(Box::new(move |actor| {
402-
Box::pin(async move {
403-
let mut context = Context {
404-
actor_ref,
405-
reply_sender,
406-
};
407-
Message::handle(actor, msg, &mut context).await;
408-
})
434+
Box::pin(actor_msg(actor, actor_ref, reply_sender, msg))
409435
}))
410436
.await
411437
}
@@ -435,6 +461,12 @@ impl<ActorT: Actor> ActorRef<ActorT> {
435461
controller: Arc::downgrade(&self.controller),
436462
}
437463
}
464+
465+
/// If the actor is stopped, get the reason.
466+
#[allow(dead_code)]
467+
pub fn stop_reason(&self) -> Option<String> {
468+
self.controller.lock().unwrap().stop_reason()
469+
}
438470
}
439471

440472
/// A weak reference to an actor. Holding this reference will not stop an actor from being
@@ -461,18 +493,29 @@ impl<ActorT: Actor> Clone for WeakActorRef<ActorT> {
461493
}
462494
}
463495

496+
type ReplySenderAsyncCb<ValueT> =
497+
Box<dyn FnOnce(Result<ValueT, SendError>) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send>;
498+
464499
pub struct ReplySender<ValueT: Send + 'static> {
465-
sender: oneshot::Sender<Result<ValueT, SendError>>,
500+
sender: ReplySenderAsyncCb<ValueT>,
466501
}
467502

468503
impl<ValueT: Send + 'static> ReplySender<ValueT> {
469-
pub fn send(self, value: ValueT) {
470-
// If the other side is no longer listening, we throw away the reply
471-
let _ = self.sender.send(Ok(value));
504+
fn new<FutT>(sender_fn: impl FnOnce(Result<ValueT, SendError>) -> FutT + Send + 'static) -> Self
505+
where
506+
FutT: Future<Output = ()> + Send + 'static,
507+
{
508+
Self {
509+
sender: Box::new(|res| Box::pin(sender_fn(res))),
510+
}
472511
}
473512

474-
pub fn no_reply(self) {
475-
let _ = self.sender.send(Err(SendError::NoReply));
513+
pub async fn send(self, value: ValueT) {
514+
(self.sender)(Ok(value)).await;
515+
}
516+
517+
pub async fn no_reply(self) {
518+
(self.sender)(Err(SendError::NoReply)).await;
476519
}
477520
}
478521

@@ -483,14 +526,6 @@ pub struct Context<ActorT: Actor, ReplyT: Send + 'static> {
483526
reply_sender: Option<ReplySender<ReplyT>>,
484527
}
485528

486-
impl<ActorT: Actor, ReplyT: Send + 'static> Drop for Context<ActorT, ReplyT> {
487-
fn drop(&mut self) {
488-
if let Some(reply_sender) = self.reply_sender.take() {
489-
reply_sender.no_reply()
490-
}
491-
}
492-
}
493-
494529
impl<ActorT: Actor, ReplyT: Send + 'static> fmt::Debug for Context<ActorT, ReplyT> {
495530
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
496531
f.debug_struct("Context").finish_non_exhaustive()
@@ -507,9 +542,9 @@ impl<ActorT: Actor, ReplyT: Send + 'static> Context<ActorT, ReplyT> {
507542

508543
/// If the sender of the message is expecting a reply, send the given message as a reply to
509544
/// them. Otherwise, if they aren't just ignore the given message.
510-
pub fn reply(&mut self, msg: ReplyT) {
545+
pub async fn reply(&mut self, msg: ReplyT) {
511546
if let Some(reply_sender) = self.reply_sender.take() {
512-
reply_sender.send(msg);
547+
reply_sender.send(msg).await;
513548
}
514549
}
515550

@@ -530,6 +565,12 @@ impl<ActorT: Actor, ReplyT: Send + 'static> Context<ActorT, ReplyT> {
530565
pub fn actor_ref(&self) -> ActorRef<ActorT> {
531566
self.actor_ref.clone()
532567
}
568+
569+
async fn maybe_send_no_reply(&mut self) {
570+
if let Some(reply_sender) = self.reply_sender.take() {
571+
reply_sender.no_reply().await
572+
}
573+
}
533574
}
534575

535576
/// The ability to receive and possible reply to a certain message.
@@ -613,7 +654,7 @@ mod tests {
613654
self.ping
614655
.send(msg.clone())
615656
.expect("messages should only be received when expected");
616-
ctx.reply(Pong(msg.0));
657+
ctx.reply(Pong(msg.0)).await;
617658
}
618659
}
619660

@@ -661,6 +702,23 @@ mod tests {
661702
assert_eq!(spy.ping.recv().await.unwrap(), Ping(12));
662703
}
663704

705+
#[tokio::test]
706+
async fn ask_callback() {
707+
let (actor, mut spy) = TestActor::new();
708+
let actor_ref = spawn(actor);
709+
710+
let (sender, reply) = oneshot::channel();
711+
actor_ref
712+
.ask_callback(Ping(12), async |res| {
713+
sender.send(res.unwrap()).unwrap();
714+
})
715+
.await
716+
.unwrap();
717+
assert_eq!(reply.await.unwrap(), Pong(12));
718+
719+
assert_eq!(spy.ping.recv().await.unwrap(), Ping(12));
720+
}
721+
664722
#[tokio::test]
665723
async fn ask_with_runner() {
666724
let (actor, mut spy) = TestActor::new();

0 commit comments

Comments
 (0)