Skip to content

Commit 28ec8c4

Browse files
committed
worker/runner: Add support for running multiple queues
1 parent e904a66 commit 28ec8c4

File tree

4 files changed

+38
-19
lines changed

4 files changed

+38
-19
lines changed

crates_io_worker/src/background_job.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ use serde::de::DeserializeOwned;
77
use serde::Serialize;
88
use tracing::instrument;
99

10+
pub const DEFAULT_QUEUE: &str = "default";
11+
1012
#[async_trait]
1113
pub trait BackgroundJob: Serialize + DeserializeOwned + Send + Sync + 'static {
1214
/// Unique name of the task.
@@ -19,6 +21,9 @@ pub trait BackgroundJob: Serialize + DeserializeOwned + Send + Sync + 'static {
1921
/// [Self::enqueue_with_priority] can be used to override the priority value.
2022
const PRIORITY: i16 = 0;
2123

24+
/// Job queue where this job will be executed.
25+
const QUEUE: &'static str = DEFAULT_QUEUE;
26+
2227
/// The application data provided to this job at runtime.
2328
type Context: Clone + Send + 'static;
2429

crates_io_worker/src/runner.rs

Lines changed: 31 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
1+
use crate::background_job::DEFAULT_QUEUE;
12
use crate::job_registry::JobRegistry;
23
use crate::worker::Worker;
34
use crate::{storage, BackgroundJob};
45
use anyhow::anyhow;
56
use diesel::prelude::*;
67
use diesel::r2d2::{ConnectionManager, Pool, PoolError, PooledConnection};
78
use futures_util::future::join_all;
9+
use std::collections::HashMap;
810
use std::time::Duration;
911
use tokio::runtime::Handle;
1012
use tokio::task::JoinHandle;
@@ -19,7 +21,7 @@ pub type PooledConn = PooledConnection<ConnectionManager<PgConnection>>;
1921
pub struct Runner<Context> {
2022
rt_handle: Handle,
2123
connection_pool: ConnectionPool,
22-
queue: Queue<Context>,
24+
queues: HashMap<String, Queue<Context>>,
2325
context: Context,
2426
shutdown_when_queue_empty: bool,
2527
}
@@ -29,23 +31,34 @@ impl<Context: Clone + Send + 'static> Runner<Context> {
2931
Self {
3032
rt_handle: rt_handle.clone(),
3133
connection_pool,
32-
queue: Queue::default(),
34+
queues: HashMap::new(),
3335
context,
3436
shutdown_when_queue_empty: false,
3537
}
3638
}
3739

38-
pub fn configure_queue<F>(mut self, f: F) -> Self
40+
/// Register a new job type for this job runner.
41+
pub fn register_job_type<J: BackgroundJob<Context = Context>>(mut self) -> Self {
42+
let queue = self.queues.entry(J::QUEUE.into()).or_default();
43+
queue.job_registry.register::<J>();
44+
self
45+
}
46+
47+
/// Adjust the configuration of the [DEFAULT_QUEUE] queue.
48+
pub fn configure_default_queue<F>(self, f: F) -> Self
3949
where
4050
F: FnOnce(&mut Queue<Context>) -> &Queue<Context>,
4151
{
42-
f(&mut self.queue);
43-
self
52+
self.configure_queue(DEFAULT_QUEUE, f)
4453
}
4554

46-
/// Register a new job type for this job runner.
47-
pub fn register_job_type<J: BackgroundJob<Context = Context>>(mut self) -> Self {
48-
self.queue.job_registry.register::<J>();
55+
/// Adjust the configuration of a queue. If the queue does not exist,
56+
/// it will be created.
57+
pub fn configure_queue<F>(mut self, name: &str, f: F) -> Self
58+
where
59+
F: FnOnce(&mut Queue<Context>) -> &Queue<Context>,
60+
{
61+
f(self.queues.entry(name.into()).or_default());
4962
self
5063
}
5164

@@ -59,11 +72,10 @@ impl<Context: Clone + Send + 'static> Runner<Context> {
5972
///
6073
/// This returns a `RunningRunner` which can be used to wait for the workers to shutdown.
6174
pub fn start(&self) -> RunHandle {
62-
let queue = &self.queue;
63-
64-
let handles = (0..queue.num_workers)
65-
.map(|i| {
66-
let name = format!("background-worker-{i}");
75+
let mut handles = Vec::new();
76+
for (queue_name, queue) in &self.queues {
77+
for i in 1..=queue.num_workers {
78+
let name = format!("background-worker-{queue_name}-{i}");
6779
info!(worker.name = %name, "Starting worker…");
6880

6981
let worker = Worker {
@@ -74,11 +86,13 @@ impl<Context: Clone + Send + 'static> Runner<Context> {
7486
poll_interval: queue.poll_interval,
7587
};
7688

77-
self.rt_handle.spawn_blocking(move || {
89+
let handle = self.rt_handle.spawn_blocking(move || {
7890
info_span!("worker", worker.name = %name).in_scope(|| worker.run())
79-
})
80-
})
81-
.collect();
91+
});
92+
93+
handles.push(handle);
94+
}
95+
}
8296

8397
RunHandle { handles }
8498
}

crates_io_worker/tests/runner.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,6 @@ fn runner<Context: Clone + Send + 'static>(
221221
.build_unchecked(ConnectionManager::new(database_url));
222222

223223
Runner::new(&Handle::current(), connection_pool, context)
224-
.configure_queue(|queue| queue.num_workers(2))
224+
.configure_default_queue(|queue| queue.num_workers(2))
225225
.shutdown_when_queue_empty()
226226
}

src/bin/background-worker.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ fn main() -> anyhow::Result<()> {
104104
});
105105

106106
let runner = Runner::new(runtime.handle(), connection_pool, environment.clone())
107-
.configure_queue(|queue| queue.num_workers(5))
107+
.configure_default_queue(|queue| queue.num_workers(5))
108108
.register_crates_io_job_types()
109109
.start();
110110

0 commit comments

Comments
 (0)