Skip to content

Commit 6d89fa1

Browse files
committed
Fix JobExecutor death causing infinite MailboxError loops by adding auto-restart
1 parent a6ce1f4 commit 6d89fa1

File tree

2 files changed

+123
-43
lines changed

2 files changed

+123
-43
lines changed

src/jobs/job_executor.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ impl Handler<ProcessOneJob> for JobExecutor {
170170
}
171171
}
172172

173-
fn start_executor(
173+
pub fn start_executor(
174174
repo: &Option<String>,
175175
config: &Arc<Config>,
176176
delta_generator: &Addr<DeltaGenerator>,
@@ -189,6 +189,8 @@ fn start_executor(
189189
}),
190190
processing_job: false,
191191
job_queued: false,
192+
restart_count: 0,
193+
last_restart: None,
192194
})
193195
}
194196

@@ -212,6 +214,9 @@ pub fn start_job_executor(
212214
JobQueue {
213215
executors,
214216
running: true,
217+
config: config.clone(),
218+
delta_generator: delta_generator.clone(),
219+
pool: pool.clone(),
215220
}
216221
.start()
217222
}

src/jobs/job_queue.rs

Lines changed: 117 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -4,21 +4,23 @@ use diesel::pg::PgConnection;
44
use diesel::prelude::*;
55
use diesel::result::DatabaseErrorKind::SerializationFailure;
66
use diesel::result::Error as DieselError;
7-
use log::{error, info};
7+
use log::{error, info, warn};
88
use serde_json::json;
99
use std::cell::RefCell;
1010
use std::collections::{HashMap, HashSet};
1111
use std::str;
12-
use std::time;
12+
use std::sync::Arc;
13+
use std::time::{Duration, Instant, SystemTime};
1314

1415
use crate::models::{
1516
Job, JobDependency, JobKind, JobStatus, NewJob, PublishedState, RepoState, UpdateRepoJob,
1617
};
1718
use crate::schema;
1819
use crate::schema::*;
19-
use crate::Pool;
20+
use crate::{Config, Pool};
2021

21-
use super::job_executor::{JobExecutor, ProcessOneJob, StopJobs};
22+
use super::job_executor::{start_executor, JobExecutor, ProcessOneJob, StopJobs};
23+
use crate::deltas::DeltaGenerator;
2224

2325
// We have an async JobQueue object that wraps the sync JobExecutor, because
2426
// that way we can respond to incomming requests immediately and decide in
@@ -29,14 +31,83 @@ pub struct ExecutorInfo {
2931
pub addr: Addr<JobExecutor>,
3032
pub processing_job: bool,
3133
pub job_queued: bool,
34+
pub restart_count: u32,
35+
pub last_restart: Option<Instant>,
3236
}
3337

3438
pub struct JobQueue {
3539
pub executors: HashMap<Option<String>, RefCell<ExecutorInfo>>,
3640
pub running: bool,
41+
pub config: Arc<Config>,
42+
pub delta_generator: Addr<DeltaGenerator>,
43+
pub pool: Pool,
3744
}
3845

46+
const MAX_RESTART_COUNT: u32 = 3;
47+
const RESTART_WINDOW: Duration = Duration::from_secs(3600); // 1 hour
48+
3949
impl JobQueue {
50+
fn can_restart_executor(&self, info: &ExecutorInfo) -> bool {
51+
let now = Instant::now();
52+
53+
if let Some(last_restart) = info.last_restart {
54+
if now.duration_since(last_restart) > RESTART_WINDOW {
55+
return true;
56+
}
57+
}
58+
59+
info.restart_count < MAX_RESTART_COUNT
60+
}
61+
62+
fn restart_executor(&mut self, repo: &Option<String>) -> bool {
63+
let now = Instant::now();
64+
65+
let should_restart = {
66+
let info = match self.executors.get(repo) {
67+
Some(executor_info) => executor_info.borrow(),
68+
None => return false,
69+
};
70+
71+
self.can_restart_executor(&info)
72+
};
73+
74+
if !should_restart {
75+
error!("Cannot restart executor for repo {repo:?}: restart limit exceeded");
76+
return false;
77+
}
78+
79+
let new_executor_info =
80+
start_executor(repo, &self.config, &self.delta_generator, &self.pool);
81+
82+
{
83+
let mut new_info = new_executor_info.borrow_mut();
84+
if let Some(old_executor_info) = self.executors.get(repo) {
85+
let old_info = old_executor_info.borrow();
86+
if let Some(last_restart) = old_info.last_restart {
87+
if now.duration_since(last_restart) <= RESTART_WINDOW {
88+
new_info.restart_count = old_info.restart_count + 1;
89+
} else {
90+
new_info.restart_count = 1;
91+
}
92+
} else {
93+
new_info.restart_count = 1;
94+
}
95+
} else {
96+
new_info.restart_count = 1;
97+
}
98+
new_info.last_restart = Some(now);
99+
}
100+
101+
self.executors.insert(repo.clone(), new_executor_info);
102+
103+
warn!(
104+
"Restarted dead executor for repo {repo:?} (restart #{}/{MAX_RESTART_COUNT} in window)",
105+
self.executors[repo].borrow().restart_count
106+
);
107+
108+
true
109+
}
110+
40111
fn kick(&mut self, repo: &Option<String>, ctx: &mut Context<Self>) {
41112
let mut info = match self.executors.get(repo) {
42113
None => {
@@ -59,31 +130,44 @@ impl JobQueue {
59130
ctx.spawn(info.addr.send(ProcessOneJob()).into_actor(self).then(
60131
|result, queue, ctx| {
61132
let job_queued = {
62-
let mut info = queue.executors.get(&repo).unwrap().borrow_mut();
63-
info.processing_job = false;
64-
info.job_queued
133+
match queue.executors.get(&repo) {
134+
Some(executor_info) => {
135+
let mut info = executor_info.borrow_mut();
136+
info.processing_job = false;
137+
info.job_queued
138+
}
139+
None => {
140+
error!("Executor for repo {repo:?} not found during callback");
141+
false
142+
}
143+
}
65144
};
66145

67146
if queue.running {
68147
let processed_job = match result {
69148
Ok(Ok(true)) => true,
70149
Ok(Ok(false)) => false,
150+
Err(_) => {
151+
error!("Executor for repo {repo:?} mailbox closed - attempting restart");
152+
153+
if queue.restart_executor(&repo) {
154+
info!("Successfully restarted executor for repo {repo:?}");
155+
queue.kick(&repo, ctx);
156+
} else {
157+
error!("Failed to restart executor for repo {repo:?} - giving up");
158+
}
159+
false
160+
}
71161
res => {
72-
error!("Unexpected ProcessOneJob result {:?}", res);
162+
error!("Unexpected ProcessOneJob result {res:?}");
73163
false
74164
}
75165
};
76166

77-
// If we ran a job, or a job was queued, kick again
78167
if job_queued || processed_job {
79168
queue.kick(&repo, ctx);
80169
} else {
81-
// We send a ProcessJobs message each time we added something to the
82-
// db, but case something external modifes the db we have a 10 sec
83-
// polling loop here. Ideally this should be using NOTIFY/LISTEN
84-
// postgre, but diesel/pq-sys does not currently support it.
85-
86-
ctx.run_later(time::Duration::new(10, 0), move |queue, ctx| {
170+
ctx.run_later(Duration::from_secs(10), move |queue, ctx| {
87171
queue.kick(&repo, ctx);
88172
});
89173
}
@@ -99,7 +183,6 @@ impl Actor for JobQueue {
99183
type Context = Context<Self>;
100184

101185
fn started(&mut self, ctx: &mut Context<Self>) {
102-
// Run any jobs in db
103186
let repos = self.executors.keys().cloned().collect::<Vec<_>>();
104187
for repo in repos {
105188
self.kick(&repo, ctx);
@@ -153,7 +236,13 @@ impl Handler<StopJobQueue> for JobQueue {
153236
}
154237

155238
pub fn cleanup_started_jobs(pool: &Pool) -> Result<(), diesel::result::Error> {
156-
let mut conn = pool.get().unwrap();
239+
let mut conn = pool.get().map_err(|e| {
240+
error!("Failed to get database connection during cleanup: {e}");
241+
diesel::result::Error::DatabaseError(
242+
diesel::result::DatabaseErrorKind::UnableToSendCommand,
243+
Box::new(format!("Connection pool error: {e}")),
244+
)
245+
})?;
157246
{
158247
use schema::builds::dsl::*;
159248
let (verifying, _) = RepoState::Committing.to_db();
@@ -222,66 +311,53 @@ pub fn queue_update_job(
222311
repo: &str,
223312
starting_job_id: Option<i32>,
224313
) -> Result<(bool, Job), DieselError> {
225-
/* We wrap everything in a serializable transaction, because if something else
226-
* starts the job while we're adding dependencies to it the dependencies will be
227-
* ignored.
228-
*/
229-
230-
let transaction_result =
231-
conn
314+
let transaction_result = conn
232315
.build_transaction()
233316
.serializable()
234317
.deferrable()
235318
.run(|conn| {
236319
let mut old_update_started_job = None;
237320
let mut old_update_new_job = None;
238321

239-
/* First look for an existing active (i.e. unstarted or running) update job matching the repo */
240-
let existing_update_jobs =
241-
jobs::table
322+
let existing_update_jobs = jobs::table
242323
.order(jobs::id.desc())
243324
.filter(jobs::kind.eq(JobKind::UpdateRepo.to_db()))
244325
.filter(jobs::status.le(JobStatus::Started as i16))
245326
.get_results::<Job>(conn)?;
246327

247328
for existing_update_job in existing_update_jobs {
248-
if let Ok(data) = serde_json::from_str::<UpdateRepoJob>(&existing_update_job.contents) {
329+
if let Ok(data) =
330+
serde_json::from_str::<UpdateRepoJob>(&existing_update_job.contents)
331+
{
249332
if data.repo == repo {
250333
if existing_update_job.status == JobStatus::New as i16 {
251334
old_update_new_job = Some(existing_update_job);
252335
} else {
253336
old_update_started_job = Some(existing_update_job);
254337
}
255-
break
338+
break;
256339
}
257340
}
258341
}
259342

260-
/* We found the last queued active update job for this repo.
261-
* If it was not started we piggy-back on it, if it was started
262-
* we make the new job depend on it to ensure we only run one
263-
* update job per repo in parallel */
264-
265343
let (is_new, update_job) = match old_update_new_job {
266344
Some(job) => (false, job),
267345
None => {
268-
/* Create a new job */
269-
let new_job =
270-
diesel::insert_into(schema::jobs::table)
346+
let new_job = diesel::insert_into(schema::jobs::table)
271347
.values(NewJob {
272348
kind: JobKind::UpdateRepo.to_db(),
273349
repo: Some(repo.to_string()),
274-
start_after: Some(time::SystemTime::now() + time::Duration::new(delay_secs, 0)),
350+
start_after: Some(SystemTime::now() + Duration::from_secs(delay_secs)),
275351
contents: json!(UpdateRepoJob {
276352
repo: repo.to_string()
277-
}).to_string(),
353+
})
354+
.to_string(),
278355
})
279356
.get_result::<Job>(conn)?;
280357
(true, new_job)
281-
},
358+
}
282359
};
283360

284-
/* Make new job depend previous started update for this repo (if any) */
285361
if let Some(previous_started_job) = old_update_started_job {
286362
diesel::insert_into(schema::job_dependencies::table)
287363
.values(JobDependency {
@@ -303,7 +379,6 @@ pub fn queue_update_job(
303379
Ok((is_new, update_job))
304380
});
305381

306-
/* Retry on serialization failure */
307382
match transaction_result {
308383
Err(DieselError::DatabaseError(SerializationFailure, _)) => {
309384
queue_update_job(delay_secs, conn, repo, starting_job_id)

0 commit comments

Comments
 (0)