Skip to content

Commit 72be62a

Browse files
authored
Merge pull request #234 from whywaita/fix/rescue-loop
Fix a flag for found job in rescue
2 parents 4964293 + 4ec7c3a commit 72be62a

File tree

2 files changed

+65
-14
lines changed

2 files changed

+65
-14
lines changed

pkg/datastore/github.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ func GetPendingWorkflowRunByRecentRepositories(ctx context.Context, ds Datastore
5656
var result []PendingWorkflowRunWithTarget
5757
// We ignore the pending run if the job is already queued.
5858
for _, pendingRun := range pendingRuns {
59+
found := false
5960
for _, job := range queuedJob {
6061
webhookEvent, err := github.ParseWebHook("workflow_job", []byte(job.CheckEventJSON))
6162
if err != nil {
@@ -70,12 +71,15 @@ func GetPendingWorkflowRunByRecentRepositories(ctx context.Context, ds Datastore
7071
}
7172

7273
if pendingRun.WorkflowRun.GetID() == workflowJob.GetWorkflowJob().GetRunID() {
73-
logger.Logf(true, "found job in datastore, So will ignore: (repo: %s, runID: %d)", pendingRun.WorkflowRun.GetRepository().GetFullName(), pendingRun.WorkflowRun.GetID())
74-
continue
74+
logger.Logf(true, "found job in datastore, So will ignore: (repo: %s, gh_run_id: %d, gh_job_id: %d)", pendingRun.WorkflowRun.GetRepository().GetFullName(), pendingRun.WorkflowRun.GetID(), workflowJob.GetWorkflowJob().GetID())
75+
found = true
76+
break
7577
}
7678
}
7779

78-
result = append(result, pendingRun)
80+
if !found {
81+
result = append(result, pendingRun)
82+
}
7983
}
8084

8185
return result, nil
@@ -140,10 +144,10 @@ func getPendingRunByRepo(ctx context.Context, client *github.Client, owner, repo
140144
oldMinutes := 10
141145
sinceMinutes := time.Since(r.CreatedAt.Time).Minutes()
142146
if sinceMinutes >= float64(oldMinutes) {
143-
logger.Logf(false, "run %d is pending over %d minutes, So will enqueue (repo: %s/%s)", r.GetID(), oldMinutes, owner, repo)
147+
logger.Logf(false, "workflow run %d is pending over %d minutes, So will enqueue (repo: %s/%s)", r.GetID(), oldMinutes, owner, repo)
144148
pendingRuns = append(pendingRuns, r)
145149
} else {
146-
logger.Logf(true, "run %d is pending, but not over %d minutes. So ignore (since: %f minutes, repo: %s/%s)", r.GetID(), oldMinutes, sinceMinutes, owner, repo)
150+
logger.Logf(true, "workflow run %d is pending, but not over %d minutes. So ignore (since: %f minutes, repo: %s/%s)", r.GetID(), oldMinutes, sinceMinutes, owner, repo)
147151
}
148152
}
149153
}

pkg/starter/starter.go

Lines changed: 56 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,12 @@ func (s *Starter) run(ctx context.Context, ch chan datastore.Job) error {
147147
c, _ := AddInstanceRetryCount.LoadOrStore(job.UUID, 0)
148148
count, _ := c.(int)
149149

150-
logger.Logf(true, "found new job: %s", job.UUID)
150+
runID, jobID, err := extractWorkflowIDs(job)
151+
if err != nil {
152+
logger.Logf(true, "found new job: %s (repo: %s)", job.UUID, job.Repository)
153+
} else {
154+
logger.Logf(true, "found new job: %s (gh_run_id: %d, gh_job_id: %d, repo: %s)", job.UUID, runID, jobID, job.Repository)
155+
}
151156
CountWaiting.Add(1)
152157
if err := sem.Acquire(ctx, 1); err != nil {
153158
return fmt.Errorf("failed to Acquire: %w", err)
@@ -180,9 +185,33 @@ func (s *Starter) run(ctx context.Context, ch chan datastore.Job) error {
180185
}
181186
}
182187

188+
// extractWorkflowIDs extracts GitHub workflow run ID and job ID from a datastore.Job
189+
func extractWorkflowIDs(job datastore.Job) (runID int64, jobID int64, err error) {
190+
webhookEvent, err := github.ParseWebHook("workflow_job", []byte(job.CheckEventJSON))
191+
if err != nil {
192+
return 0, 0, fmt.Errorf("failed to parse webhook: %w", err)
193+
}
194+
195+
workflowJob, ok := webhookEvent.(*github.WorkflowJobEvent)
196+
if !ok {
197+
return 0, 0, fmt.Errorf("failed to cast to WorkflowJobEvent")
198+
}
199+
200+
if workflowJob.GetWorkflowJob() == nil {
201+
return 0, 0, fmt.Errorf("workflow job is nil")
202+
}
203+
204+
return workflowJob.GetWorkflowJob().GetRunID(), workflowJob.GetWorkflowJob().GetID(), nil
205+
}
206+
183207
// ProcessJob is process job
184208
func (s *Starter) ProcessJob(ctx context.Context, job datastore.Job) error {
185-
logger.Logf(false, "start job (job id: %s)\n", job.UUID.String())
209+
runID, jobID, err := extractWorkflowIDs(job)
210+
if err != nil {
211+
logger.Logf(false, "start job (job id: %s, repo: %s)\n", job.UUID.String(), job.Repository)
212+
} else {
213+
logger.Logf(false, "start job (job id: %s, gh_run_id: %d, gh_job_id: %d, repo: %s)\n", job.UUID.String(), runID, jobID, job.Repository)
214+
}
186215

187216
isOK, err := s.safety.Check(&job)
188217
if err != nil {
@@ -207,10 +236,19 @@ func (s *Starter) ProcessJob(ctx context.Context, job datastore.Job) error {
207236
defer cancel()
208237
cloudID, ipAddress, shoesType, resourceType, err := s.bung(cctx, job, *target)
209238
if err != nil {
210-
logger.Logf(false, "failed to bung (target ID: %s, job ID: %s): %+v", job.TargetID, job.UUID, err)
239+
runID2, jobID2, extractErr := extractWorkflowIDs(job)
240+
if extractErr != nil {
241+
logger.Logf(false, "failed to bung (target ID: %s, job ID: %s): %+v", job.TargetID, job.UUID, err)
242+
} else {
243+
logger.Logf(false, "failed to bung (target ID: %s, job ID: %s, gh_run_id: %d, gh_job_id: %d): %+v", job.TargetID, job.UUID, runID2, jobID2, err)
244+
}
211245

212246
if errors.Is(err, ErrInvalidLabel) {
213-
logger.Logf(false, "invalid argument. so will delete (job ID: %s)", job.UUID)
247+
if extractErr != nil {
248+
logger.Logf(false, "invalid argument. so will delete (job ID: %s)", job.UUID)
249+
} else {
250+
logger.Logf(false, "invalid argument. so will delete (job ID: %s, gh_run_id: %d, gh_job_id: %d)", job.UUID, runID2, jobID2)
251+
}
214252
if err := s.ds.DeleteJob(ctx, job.UUID); err != nil {
215253
logger.Logf(false, "failed to delete job: %+v\n", err)
216254

@@ -294,7 +332,12 @@ func (s *Starter) ProcessJob(ctx context.Context, job datastore.Job) error {
294332

295333
// bung is start runner, like a pistol! :)
296334
func (s *Starter) bung(ctx context.Context, job datastore.Job, target datastore.Target) (string, string, string, datastore.ResourceType, error) {
297-
logger.Logf(false, "start create instance (job: %s)", job.UUID)
335+
runID, jobID, extractErr := extractWorkflowIDs(job)
336+
if extractErr != nil {
337+
logger.Logf(false, "start create instance (job: %s)", job.UUID)
338+
} else {
339+
logger.Logf(false, "start create instance (job: %s, gh_run_id: %d, gh_job_id: %d)", job.UUID, runID, jobID)
340+
}
298341
runnerName := runner.ToName(job.UUID.String())
299342

300343
targetScope := getTargetScope(target, job)
@@ -322,7 +365,11 @@ func (s *Starter) bung(ctx context.Context, job datastore.Job, target datastore.
322365
return "", "", "", datastore.ResourceTypeUnknown, fmt.Errorf("failed to add instance: %w", err)
323366
}
324367

325-
logger.Logf(false, "instance create successfully! (job: %s, cloud ID: %s)", job.UUID, cloudID)
368+
if extractErr != nil {
369+
logger.Logf(false, "instance create successfully! (job: %s, cloud ID: %s)", job.UUID, cloudID)
370+
} else {
371+
logger.Logf(false, "instance create successfully! (job: %s, cloud ID: %s, gh_run_id: %d, gh_job_id: %d)", job.UUID, cloudID, runID, jobID)
372+
}
326373

327374
return cloudID, ipAddress, shoesType, resourceType, nil
328375
}
@@ -445,7 +492,7 @@ func enqueueRescueRun(ctx context.Context, pendingRun datastore.PendingWorkflowR
445492
// Get full installation data from cache
446493
installation, err := gh.GetInstallationByID(ctx, installationID)
447494
if err != nil {
448-
logger.Logf(false, "failed to get installation from cache (installationID: %d), using minimal data: %+v", installationID, err)
495+
logger.Logf(false, "failed to get installation from cache (installationID: %d), using minimal data: %+v", installationID, err)
449496
// Fallback to minimal installation data
450497
installation = &github.Installation{
451498
ID: &installationID,
@@ -461,7 +508,7 @@ func enqueueRescueRun(ctx context.Context, pendingRun datastore.PendingWorkflowR
461508
Name: owner.Name,
462509
}
463510
}
464-
511+
465512
event := &github.WorkflowJobEvent{
466513
WorkflowJob: job,
467514
Action: github.String("queued"),
@@ -508,7 +555,7 @@ func enqueueRescueJob(ctx context.Context, workflowJob *github.WorkflowJobEvent,
508555
gheDomain = fmt.Sprintf("%s://%s", u.Scheme, u.Host)
509556
}
510557

511-
logger.Logf(false, "rescue pending job: (repo: %s, jobID: %d)", *repository.HTMLURL, workflowJob.WorkflowJob.GetID())
558+
logger.Logf(false, "rescue pending job: (repo: %s, gh_run_id: %d, gh_job_id: %d)", *repository.HTMLURL, workflowJob.WorkflowJob.GetRunID(), workflowJob.WorkflowJob.GetID())
512559
jobID := uuid.NewV4()
513560
job := datastore.Job{
514561
UUID: jobID,

0 commit comments

Comments
 (0)