Skip to content

Commit 0297b78

Browse files
authored
Merge pull request #7640 from Turbo87/multi-queue
Add support for running multiple background worker queues
2 parents c319c55 + e1020c6 commit 0297b78

File tree

9 files changed

+114
-30
lines changed

9 files changed

+114
-30
lines changed

crates_io_worker/src/background_job.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ use serde::de::DeserializeOwned;
77
use serde::Serialize;
88
use tracing::instrument;
99

10+
pub const DEFAULT_QUEUE: &str = "default";
11+
1012
#[async_trait]
1113
pub trait BackgroundJob: Serialize + DeserializeOwned + Send + Sync + 'static {
1214
/// Unique name of the task.
@@ -19,6 +21,9 @@ pub trait BackgroundJob: Serialize + DeserializeOwned + Send + Sync + 'static {
1921
/// [Self::enqueue_with_priority] can be used to override the priority value.
2022
const PRIORITY: i16 = 0;
2123

24+
/// Job queue where this job will be executed.
25+
const QUEUE: &'static str = DEFAULT_QUEUE;
26+
2227
/// The application data provided to this job at runtime.
2328
type Context: Clone + Send + 'static;
2429

crates_io_worker/src/job_registry.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,11 @@ impl<Context: Clone + Send + 'static> JobRegistry<Context> {
2929
pub fn get(&self, key: &str) -> Option<&Arc<RunTaskFn<Context>>> {
3030
self.entries.get(key)
3131
}
32+
33+
/// Returns a list of all registered job types.
34+
pub fn job_types(&self) -> Vec<String> {
35+
self.entries.keys().cloned().collect()
36+
}
3237
}
3338

3439
fn runnable<J: BackgroundJob>(ctx: J::Context, payload: serde_json::Value) -> RunTaskFnReturn {
@@ -37,3 +42,30 @@ fn runnable<J: BackgroundJob>(ctx: J::Context, payload: serde_json::Value) -> Ru
3742
job.run(ctx).await
3843
})
3944
}
45+
46+
#[cfg(test)]
47+
mod tests {
48+
use super::*;
49+
use crate::BackgroundJob;
50+
use async_trait::async_trait;
51+
use serde::{Deserialize, Serialize};
52+
53+
#[test]
54+
fn test_job_types() {
55+
#[derive(Serialize, Deserialize)]
56+
struct TestJob;
57+
58+
#[async_trait]
59+
impl BackgroundJob for TestJob {
60+
const JOB_NAME: &'static str = "test";
61+
type Context = ();
62+
async fn run(&self, _: Self::Context) -> anyhow::Result<()> {
63+
Ok(())
64+
}
65+
}
66+
67+
let mut registry = JobRegistry::default();
68+
registry.register::<TestJob>();
69+
assert_eq!(registry.job_types(), vec!["test"]);
70+
}
71+
}

crates_io_worker/src/runner.rs

Lines changed: 63 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
1+
use crate::background_job::DEFAULT_QUEUE;
12
use crate::job_registry::JobRegistry;
23
use crate::worker::Worker;
34
use crate::{storage, BackgroundJob};
45
use anyhow::anyhow;
56
use diesel::prelude::*;
67
use diesel::r2d2::{ConnectionManager, Pool, PoolError, PooledConnection};
78
use futures_util::future::join_all;
9+
use std::collections::HashMap;
810
use std::time::Duration;
911
use tokio::runtime::Handle;
1012
use tokio::task::JoinHandle;
@@ -19,10 +21,8 @@ pub type PooledConn = PooledConnection<ConnectionManager<PgConnection>>;
1921
pub struct Runner<Context> {
2022
rt_handle: Handle,
2123
connection_pool: ConnectionPool,
22-
num_workers: usize,
23-
job_registry: JobRegistry<Context>,
24+
queues: HashMap<String, Queue<Context>>,
2425
context: Context,
25-
poll_interval: Duration,
2626
shutdown_when_queue_empty: bool,
2727
}
2828

@@ -31,29 +31,34 @@ impl<Context: Clone + Send + 'static> Runner<Context> {
3131
Self {
3232
rt_handle: rt_handle.clone(),
3333
connection_pool,
34-
num_workers: 1,
35-
job_registry: Default::default(),
34+
queues: HashMap::new(),
3635
context,
37-
poll_interval: DEFAULT_POLL_INTERVAL,
3836
shutdown_when_queue_empty: false,
3937
}
4038
}
4139

42-
/// Set the number of workers to spawn.
43-
pub fn num_workers(mut self, num_workers: usize) -> Self {
44-
self.num_workers = num_workers;
40+
/// Register a new job type for this job runner.
41+
pub fn register_job_type<J: BackgroundJob<Context = Context>>(mut self) -> Self {
42+
let queue = self.queues.entry(J::QUEUE.into()).or_default();
43+
queue.job_registry.register::<J>();
4544
self
4645
}
4746

48-
/// Set the interval after which each worker polls for new jobs.
49-
pub fn poll_interval(mut self, poll_interval: Duration) -> Self {
50-
self.poll_interval = poll_interval;
51-
self
47+
/// Adjust the configuration of the [DEFAULT_QUEUE] queue.
48+
pub fn configure_default_queue<F>(self, f: F) -> Self
49+
where
50+
F: FnOnce(&mut Queue<Context>) -> &Queue<Context>,
51+
{
52+
self.configure_queue(DEFAULT_QUEUE, f)
5253
}
5354

54-
/// Register a new job type for this job runner.
55-
pub fn register_job_type<J: BackgroundJob<Context = Context>>(mut self) -> Self {
56-
self.job_registry.register::<J>();
55+
/// Adjust the configuration of a queue. If the queue does not exist,
56+
/// it will be created.
57+
pub fn configure_queue<F>(mut self, name: &str, f: F) -> Self
58+
where
59+
F: FnOnce(&mut Queue<Context>) -> &Queue<Context>,
60+
{
61+
f(self.queues.entry(name.into()).or_default());
5762
self
5863
}
5964

@@ -67,24 +72,27 @@ impl<Context: Clone + Send + 'static> Runner<Context> {
6772
///
6873
/// This returns a `RunningRunner` which can be used to wait for the workers to shutdown.
6974
pub fn start(&self) -> RunHandle {
70-
let handles = (0..self.num_workers)
71-
.map(|i| {
72-
let name = format!("background-worker-{i}");
75+
let mut handles = Vec::new();
76+
for (queue_name, queue) in &self.queues {
77+
for i in 1..=queue.num_workers {
78+
let name = format!("background-worker-{queue_name}-{i}");
7379
info!(worker.name = %name, "Starting worker…");
7480

7581
let worker = Worker {
7682
connection_pool: self.connection_pool.clone(),
7783
context: self.context.clone(),
78-
job_registry: self.job_registry.clone(),
84+
job_registry: queue.job_registry.clone(),
7985
shutdown_when_queue_empty: self.shutdown_when_queue_empty,
80-
poll_interval: self.poll_interval,
86+
poll_interval: queue.poll_interval,
8187
};
8288

83-
self.rt_handle.spawn_blocking(move || {
89+
let handle = self.rt_handle.spawn_blocking(move || {
8490
info_span!("worker", worker.name = %name).in_scope(|| worker.run())
85-
})
86-
})
87-
.collect();
91+
});
92+
93+
handles.push(handle);
94+
}
95+
}
8896

8997
RunHandle { handles }
9098
}
@@ -121,3 +129,33 @@ impl RunHandle {
121129
});
122130
}
123131
}
132+
133+
pub struct Queue<Context> {
134+
job_registry: JobRegistry<Context>,
135+
num_workers: usize,
136+
poll_interval: Duration,
137+
}
138+
139+
impl<Context> Default for Queue<Context> {
140+
fn default() -> Self {
141+
Self {
142+
job_registry: JobRegistry::default(),
143+
num_workers: 1,
144+
poll_interval: DEFAULT_POLL_INTERVAL,
145+
}
146+
}
147+
}
148+
149+
impl<Context> Queue<Context> {
150+
/// Set the number of workers to spawn for this queue.
151+
pub fn num_workers(&mut self, num_workers: usize) -> &mut Self {
152+
self.num_workers = num_workers;
153+
self
154+
}
155+
156+
/// Set the interval after which each worker of this queue polls for new jobs.
157+
pub fn poll_interval(&mut self, poll_interval: Duration) -> &mut Self {
158+
self.poll_interval = poll_interval;
159+
self
160+
}
161+
}

crates_io_worker/src/storage.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,13 @@ fn retriable() -> Box<dyn BoxableExpression<background_jobs::table, Pg, SqlType
2525

2626
/// Finds the next job that is unlocked, and ready to be retried. If a row is
2727
/// found, it will be locked.
28-
pub(super) fn find_next_unlocked_job(conn: &mut PgConnection) -> QueryResult<BackgroundJob> {
28+
pub(super) fn find_next_unlocked_job(
29+
conn: &mut PgConnection,
30+
job_types: &[String],
31+
) -> QueryResult<BackgroundJob> {
2932
background_jobs::table
3033
.select(BackgroundJob::as_select())
34+
.filter(background_jobs::job_type.eq_any(job_types))
3135
.filter(retriable())
3236
.order((background_jobs::priority.desc(), background_jobs::id))
3337
.for_update()

crates_io_worker/src/worker.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,13 @@ impl<Context: Clone + Send + 'static> Worker<Context> {
5151
/// - `Ok(None)` if no jobs were waiting
5252
/// - `Err(...)` if there was an error retrieving the job
5353
fn run_next_job(&self) -> anyhow::Result<Option<i64>> {
54+
let job_types = &self.job_registry.job_types();
55+
5456
let conn = &mut *self.connection_pool.get()?;
5557

5658
conn.transaction(|conn| {
5759
debug!("Looking for next background worker job…");
58-
let Some(job) = storage::find_next_unlocked_job(conn).optional()? else {
60+
let Some(job) = storage::find_next_unlocked_job(conn, job_types).optional()? else {
5961
return Ok(None);
6062
};
6163

crates_io_worker/tests/runner.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,6 @@ fn runner<Context: Clone + Send + 'static>(
221221
.build_unchecked(ConnectionManager::new(database_url));
222222

223223
Runner::new(&Handle::current(), connection_pool, context)
224-
.num_workers(2)
224+
.configure_default_queue(|queue| queue.num_workers(2))
225225
.shutdown_when_queue_empty()
226226
}

src/bin/background-worker.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,8 @@ fn main() -> anyhow::Result<()> {
104104
});
105105

106106
let runner = Runner::new(runtime.handle(), connection_pool, environment.clone())
107-
.num_workers(5)
107+
.configure_default_queue(|queue| queue.num_workers(5))
108+
.configure_queue("repository", |queue| queue.num_workers(1))
108109
.register_crates_io_job_types()
109110
.start();
110111

src/tests/util/test_app.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,6 @@ impl TestAppBuilder {
272272
(*app.primary_database).clone(),
273273
Arc::new(environment),
274274
)
275-
.num_workers(1)
276275
.shutdown_when_queue_empty()
277276
.register_crates_io_job_types();
278277

src/worker/jobs/git.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ impl SyncToGitIndex {
2929
impl BackgroundJob for SyncToGitIndex {
3030
const JOB_NAME: &'static str = "sync_to_git_index";
3131
const PRIORITY: i16 = 100;
32+
const QUEUE: &'static str = "repository";
3233

3334
type Context = Arc<Environment>;
3435

@@ -165,6 +166,7 @@ pub struct SquashIndex;
165166
#[async_trait]
166167
impl BackgroundJob for SquashIndex {
167168
const JOB_NAME: &'static str = "squash_index";
169+
const QUEUE: &'static str = "repository";
168170

169171
type Context = Arc<Environment>;
170172

@@ -223,6 +225,7 @@ impl NormalizeIndex {
223225
#[async_trait]
224226
impl BackgroundJob for NormalizeIndex {
225227
const JOB_NAME: &'static str = "normalize_index";
228+
const QUEUE: &'static str = "repository";
226229

227230
type Context = Arc<Environment>;
228231

0 commit comments

Comments
 (0)