Skip to content

Commit f7b7975

Browse files
authored
Remove remote / local actor routing (risc0#3452)
Instead, just use loopback for local actor communication (if the actor can be remote)
1 parent e04b7ae commit f7b7975

File tree

6 files changed

+138
-325
lines changed

6 files changed

+138
-325
lines changed

risc0/r0vm/src/actors/allocator.rs

Lines changed: 4 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -35,18 +35,17 @@
3535
//!
3636
3737
use std::collections::{BTreeMap, HashMap, HashSet, VecDeque, btree_map, hash_map};
38-
use std::error::Error as StdError;
3938
use std::fmt;
4039
use std::net::SocketAddr;
4140
use std::net::{IpAddr, Ipv4Addr};
4241
use std::sync::Arc;
4342

4443
use super::{
4544
RemoteActor, RemoteAllocatorRequest, RpcDisconnect,
46-
actor::{self, Actor, ActorRef, Context, Message, ReplySender},
45+
actor::{Actor, ActorRef, Context, Message, ReplySender},
4746
error::{Error, Result},
4847
protocol::{GlobalId, WorkerId, WorkerIdFmt},
49-
remote_actor_ask, routing_actor_impl,
48+
remote_actor_ask,
5049
};
5150
use derive_more::{Add, AddAssign, From, Sub, SubAssign};
5251
use http_body_util::BodyExt as _;
@@ -1042,6 +1041,7 @@ impl AllocatorActor {
10421041
#[cfg(test)]
10431042
mod allocation_tests {
10441043
use super::*;
1044+
use crate::actors::actor;
10451045

10461046
fn test_gpu_id(n: u32) -> GpuUuid {
10471047
assert!(n < 10);
@@ -1744,50 +1744,6 @@ mod allocation_tests {
17441744
// |_| | .__/ \___|
17451745
// |_|
17461746

1747-
pub(crate) enum AllocatorRouterActor {
1748-
Local(ActorRef<AllocatorActor>),
1749-
Remote(ActorRef<RemoteAllocatorActor>),
1750-
}
1751-
1752-
impl Actor for AllocatorRouterActor {}
1753-
1754-
impl AllocatorRouterActor {
1755-
pub async fn new(
1756-
addr: &Option<SocketAddr>,
1757-
local: &Option<ActorRef<AllocatorActor>>,
1758-
) -> std::result::Result<ActorRef<Self>, Box<dyn StdError>> {
1759-
if let Some(addr) = addr {
1760-
let remote =
1761-
actor::spawn(RemoteAllocatorActor::new(*addr, "RemoteAllocatorActor").await?);
1762-
Ok(actor::spawn(Self::Remote(remote)))
1763-
} else {
1764-
Ok(actor::spawn(Self::Local(
1765-
local.as_ref().ok_or("no allocator configured")?.clone(),
1766-
)))
1767-
}
1768-
}
1769-
}
1770-
1771-
routing_actor_impl!(
1772-
AllocatorRouterActor,
1773-
RegisterWorker,
1774-
Result<RegisterWorkerReply>
1775-
);
1776-
1777-
routing_actor_impl!(
1778-
AllocatorRouterActor,
1779-
ScheduleTask,
1780-
Result<ScheduleTaskReply>
1781-
);
1782-
1783-
routing_actor_impl!(AllocatorRouterActor, AllocateHardware, Result<()>);
1784-
1785-
routing_actor_impl!(AllocatorRouterActor, DeallocateHardware, Result<()>);
1786-
1787-
routing_actor_impl!(AllocatorRouterActor, EndTask, Result<()>);
1788-
1789-
routing_actor_impl!(AllocatorRouterActor, RegisterManager, Result<()>);
1790-
17911747
pub type RemoteAllocatorActor = RemoteActor<AllocatorActor>;
17921748

17931749
remote_actor_ask!(
@@ -2048,6 +2004,7 @@ pub async fn run_proxy(
20482004
#[cfg(test)]
20492005
mod proxy_tests {
20502006
use super::*;
2007+
use crate::actors::actor;
20512008
use axum_test::TestServer;
20522009
use httpmock::MockServer;
20532010
use rstest::rstest;

risc0/r0vm/src/actors/factory.rs

Lines changed: 8 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -14,23 +14,22 @@
1414
// SPDX-License-Identifier: Apache-2.0 OR MIT
1515

1616
use std::collections::HashMap;
17-
use std::error::Error as StdError;
18-
use std::net::SocketAddr;
1917

2018
use multi_index_map::MultiIndexMap;
2119

2220
use super::{
23-
RemoteActor, RemoteFactoryRequest, RpcDisconnect, RpcMessageId, WorkerRouterActor,
24-
actor::{self, Actor, ActorRef, Context, Message},
25-
allocator::{AllocatorRouterActor, CpuCores, GpuTokens, ScheduleTask},
21+
RemoteActor, RemoteFactoryRequest, RpcDisconnect,
22+
actor::{Actor, ActorRef, Context, Message},
23+
allocator::{CpuCores, GpuTokens, RemoteAllocatorActor, ScheduleTask},
2624
error::{Error, Result as ActorResult},
2725
job::JobActor,
2826
protocol::{
2927
GlobalId, JobId, TaskKind, WorkerId,
3028
factory::{DropJob, GetTasks, SubmitTaskMsg, TaskDoneMsg, TaskUpdateMsg},
3129
worker::TaskMsg,
3230
},
33-
remote_actor_tell, routing_actor_impl,
31+
remote_actor_tell,
32+
worker::RemoteWorkerActor,
3433
};
3534

3635
#[derive(MultiIndexMap)]
@@ -46,13 +45,13 @@ pub(crate) struct FactoryActor {
4645
workers: MultiIndexWorkerRowMap,
4746
pending_tasks: Vec<SubmitTaskMsg>,
4847
active_tasks: HashMap<GlobalId, SubmitTaskMsg>,
49-
worker_actors: HashMap<WorkerId, ActorRef<WorkerRouterActor>>,
50-
allocator: ActorRef<AllocatorRouterActor>,
48+
worker_actors: HashMap<WorkerId, ActorRef<RemoteWorkerActor>>,
49+
allocator: ActorRef<RemoteAllocatorActor>,
5150
require_gpu: bool,
5251
}
5352

5453
impl FactoryActor {
55-
pub fn new(allocator: ActorRef<AllocatorRouterActor>, require_gpu: bool) -> Self {
54+
pub fn new(allocator: ActorRef<RemoteAllocatorActor>, require_gpu: bool) -> Self {
5655
Self {
5756
jobs: HashMap::default(),
5857
workers: Default::default(),
@@ -248,46 +247,6 @@ impl Message<RpcDisconnect> for FactoryActor {
248247
// |_| | .__/ \___|
249248
// |_|
250249

251-
pub(crate) enum FactoryRouterActor {
252-
Local(ActorRef<FactoryActor>),
253-
Remote(ActorRef<RemoteFactoryActor>),
254-
}
255-
256-
impl Actor for FactoryRouterActor {}
257-
258-
impl FactoryRouterActor {
259-
pub async fn new<RemoteMsgT, FutT>(
260-
addr: &Option<SocketAddr>,
261-
local: &Option<ActorRef<FactoryActor>>,
262-
remote_msg_callback: impl FnMut(Option<RemoteMsgT>, Option<RpcMessageId>, SocketAddr) -> FutT
263-
+ Send
264-
+ 'static,
265-
) -> Result<ActorRef<Self>, Box<dyn StdError>>
266-
where
267-
RemoteMsgT: serde::de::DeserializeOwned,
268-
FutT: Future<Output = ()> + Send,
269-
{
270-
if let Some(addr) = addr {
271-
let remote = actor::spawn(
272-
RemoteFactoryActor::new_with_remote_msg_callback(*addr, remote_msg_callback)
273-
.await?,
274-
);
275-
Ok(actor::spawn(Self::Remote(remote)))
276-
} else {
277-
Ok(actor::spawn(Self::Local(
278-
local
279-
.as_ref()
280-
.ok_or("no manager found from allocator or locally")?
281-
.clone(),
282-
)))
283-
}
284-
}
285-
}
286-
287-
routing_actor_impl!(FactoryRouterActor, GetTasks, ());
288-
routing_actor_impl!(FactoryRouterActor, TaskUpdateMsg, ());
289-
routing_actor_impl!(FactoryRouterActor, TaskDoneMsg, ());
290-
291250
pub type RemoteFactoryActor = RemoteActor<FactoryActor>;
292251

293252
remote_actor_tell!(RemoteActor<FactoryActor>, GetTasks, RemoteFactoryRequest);

0 commit comments

Comments
 (0)