Skip to content

Commit dd29c58

Browse files
committed
Improve FetchTask reliability / performance
1 parent 0fb3be7 commit dd29c58

File tree

2 files changed

+78
-46
lines changed

2 files changed

+78
-46
lines changed

models/actions/task.go

Lines changed: 65 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"crypto/subtle"
99
"errors"
1010
"fmt"
11+
"math/rand"
1112
"time"
1213

1314
auth_model "code.gitea.io/gitea/models/auth"
@@ -223,6 +224,20 @@ func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask
223224

224225
e := db.GetEngine(ctx)
225226

227+
// Create a new task record as early as possible to be able to reserve jobs
228+
task := &ActionTask{
229+
RunnerID: runner.ID,
230+
Status: StatusBlocked,
231+
}
232+
// This is a requirement of the database schema
233+
if err := task.GenerateToken(); err != nil {
234+
return nil, false, err
235+
}
236+
237+
if _, err := e.Insert(task); err != nil {
238+
return nil, false, err
239+
}
240+
226241
jobCond := builder.NewCond()
227242
if runner.RepoID != 0 {
228243
jobCond = builder.Eq{"repo_id": runner.RepoID}
@@ -235,18 +250,46 @@ func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask
235250
jobCond = builder.In("run_id", builder.Select("id").From("action_run").Where(jobCond))
236251
}
237252

238-
var jobs []*ActionRunJob
239-
if err := e.Where("task_id=? AND status=?", 0, StatusWaiting).And(jobCond).Asc("updated", "id").Find(&jobs); err != nil {
240-
return nil, false, err
241-
}
242-
243-
// TODO: a more efficient way to filter labels
244253
var job *ActionRunJob
245-
log.Trace("runner labels: %v", runner.AgentLabels)
246-
for _, v := range jobs {
247-
if runner.CanMatchLabels(v.RunsOn) {
248-
job = v
249-
break
254+
255+
const limit = 10
256+
// TODO store the last position to continue searching next time inside the runner record
257+
// e.g. we would start again from zero if no job matches our known labels
258+
// For stable paging
259+
var lastUpdated timeutil.TimeStamp
260+
for page := 0; job == nil; page++ {
261+
var jobs []*ActionRunJob
262+
// Load only 10 job in a batch without all fields for memory / db load reduction
263+
if err := e.Where("task_id=? AND status=? AND updated>=?", 0, StatusWaiting, lastUpdated).Cols("id", "runs_on").And(jobCond).Asc("updated", "id").Limit(limit, page*limit).Find(&jobs); err != nil {
264+
return nil, false, err
265+
}
266+
267+
// TODO: a more efficient way to filter labels
268+
log.Trace("runner labels: %v", runner.AgentLabels)
269+
backoffGen := rand.New(rand.NewSource(time.Now().UnixNano() ^ int64(runner.ID)))
270+
for _, v := range jobs {
271+
if runner.CanMatchLabels(v.RunsOn) {
272+
// Reserve our job before preparing task, otherwise continue searching
273+
v.TaskID = task.ID
274+
if n, err := UpdateRunJob(ctx, v, builder.Eq{"task_id": 0}); err != nil {
275+
return nil, false, err
276+
} else if n == 1 {
277+
var exist bool
278+
// reload to get all fields
279+
if job, exist, err = db.GetByID[ActionRunJob](ctx, v.ID); err != nil || !exist {
280+
return nil, false, err
281+
}
282+
break
283+
}
284+
}
285+
lastUpdated = v.Updated
286+
}
287+
// Randomly distribute retries over time to reduce contention
288+
jitter := time.Duration(backoffGen.Int63n(int64(util.Iif(page < 4, page+1, 5))*20)) * time.Millisecond // random jitter
289+
select {
290+
case <-ctx.Done():
291+
return nil, false, ctx.Err()
292+
case <-time.After(jitter):
250293
}
251294
}
252295
if job == nil {
@@ -261,32 +304,23 @@ func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask
261304
job.Started = now
262305
job.Status = StatusRunning
263306

264-
task := &ActionTask{
265-
JobID: job.ID,
266-
Attempt: job.Attempt,
267-
RunnerID: runner.ID,
268-
Started: now,
269-
Status: StatusRunning,
270-
RepoID: job.RepoID,
271-
OwnerID: job.OwnerID,
272-
CommitSHA: job.CommitSHA,
273-
IsForkPullRequest: job.IsForkPullRequest,
274-
}
275-
if err := task.GenerateToken(); err != nil {
276-
return nil, false, err
277-
}
278-
279307
workflowJob, err := job.ParseJob()
280308
if err != nil {
281309
return nil, false, fmt.Errorf("load job %d: %w", job.ID, err)
282310
}
283311

284-
if _, err := e.Insert(task); err != nil {
285-
return nil, false, err
286-
}
287-
312+
task.Job = job
313+
task.JobID = job.ID
314+
task.Attempt = job.Attempt
315+
task.Started = now
316+
task.Status = StatusRunning
317+
task.RepoID = job.RepoID
318+
task.OwnerID = job.OwnerID
319+
task.CommitSHA = job.CommitSHA
320+
task.IsForkPullRequest = job.IsForkPullRequest
288321
task.LogFilename = logFileName(job.Run.Repo.FullName(), task.ID)
289-
if err := UpdateTask(ctx, task, "log_filename"); err != nil {
322+
323+
if err := UpdateTask(ctx, task, "job_id", "attempt", "started", "status", "repo_id", "owner_id", "commit_sha", "is_fork_pull_request", "log_filename"); err != nil {
290324
return nil, false, err
291325
}
292326

@@ -308,15 +342,6 @@ func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask
308342
task.Steps = steps
309343
}
310344

311-
job.TaskID = task.ID
312-
if n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}); err != nil {
313-
return nil, false, err
314-
} else if n != 1 {
315-
return nil, false, nil
316-
}
317-
318-
task.Job = job
319-
320345
if err := committer.Commit(); err != nil {
321346
return nil, false, err
322347
}

routers/api/actions/runner/interceptor.go

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,14 +45,21 @@ var withRunner = connect.WithInterceptors(connect.UnaryInterceptorFunc(func(unar
4545
return nil, status.Error(codes.Unauthenticated, "unregistered runner")
4646
}
4747

48-
cols := []string{"last_online"}
49-
runner.LastOnline = timeutil.TimeStampNow()
50-
if methodName == "UpdateTask" || methodName == "UpdateLog" {
51-
runner.LastActive = timeutil.TimeStampNow()
48+
// Reduce db writes by only updating last active/online when needed
49+
var cols []string
50+
now := timeutil.TimeStampNow()
51+
if runner.LastActive.AddDuration(actions_model.RunnerOfflineTime/2) < now {
52+
runner.LastOnline = now
53+
cols = append(cols, "last_online")
54+
}
55+
if (methodName == "UpdateTask" || methodName == "UpdateLog") && runner.LastActive.AddDuration(actions_model.RunnerIdleTime/2) < now {
56+
runner.LastActive = now
5257
cols = append(cols, "last_active")
5358
}
54-
if err := actions_model.UpdateRunner(ctx, runner, cols...); err != nil {
55-
log.Error("can't update runner status: %v", err)
59+
if cols != nil {
60+
if err := actions_model.UpdateRunner(ctx, runner, cols...); err != nil {
61+
log.Error("can't update runner status: %v", err)
62+
}
5663
}
5764

5865
ctx = context.WithValue(ctx, runnerCtxKey{}, runner)

0 commit comments

Comments
 (0)