Skip to content

Commit a23b22a

Browse files
Add a github path reconciler module. (#1159)
This module basically sets up all of the scaffolding to reconcile files within a github repository. There are two main triggers: 1. A periodic cron (every X hours) 2. A push event listener (immediate) When these run, they enumerate either all of the files in the repo, or all of the changed files in the push. We apply the file patterns to each of them, and when they match, we take the substring that the pattern captures as the key. The intent of this is to be able to match e.g. `images/foo` when `images/foo/bar/baz.blah` changes by having a pattern like: ``` (images/[^/]+)/.* ``` There are a couple funky interactions between this and some of the current workqueue semantics, but I am going to fix that in a parallel change. --------- Signed-off-by: Matt Moore <[email protected]> Co-authored-by: octo-sts[bot] <157150467+octo-sts[bot]@users.noreply.github.com>
1 parent d87f82d commit a23b22a

File tree

9 files changed

+1454
-0
lines changed

9 files changed

+1454
-0
lines changed

modules/github-path-reconciler/README.md

Lines changed: 256 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
/*
2+
Copyright 2025 Chainguard, Inc.
3+
SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package main
7+
8+
import (
9+
"context"
10+
"fmt"
11+
"os/signal"
12+
"regexp"
13+
"strings"
14+
"syscall"
15+
16+
"github.com/chainguard-dev/clog"
17+
_ "github.com/chainguard-dev/clog/gcp/init"
18+
"github.com/chainguard-dev/terraform-infra-common/modules/github-path-reconciler/internal/patterns"
19+
"github.com/chainguard-dev/terraform-infra-common/pkg/githubreconciler"
20+
"github.com/chainguard-dev/terraform-infra-common/pkg/httpmetrics"
21+
"github.com/chainguard-dev/terraform-infra-common/pkg/workqueue"
22+
cloudevents "github.com/cloudevents/sdk-go/v2"
23+
"github.com/google/go-github/v75/github"
24+
"github.com/sethvargo/go-envconfig"
25+
"golang.org/x/oauth2"
26+
"golang.org/x/sync/errgroup"
27+
)
28+
29+
type config struct {
30+
Port int `env:"PORT,default=8080"`
31+
32+
// Workqueue configuration
33+
WorkqueueAddr string `env:"WORKQUEUE_ADDR,required"`
34+
35+
// Path patterns (JSON array)
36+
PathPatterns string `env:"PATH_PATTERNS,required"`
37+
38+
// Octo STS identity
39+
OctoIdentity string `env:"OCTO_IDENTITY,required"`
40+
}
41+
42+
func main() {
43+
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
44+
defer cancel()
45+
46+
go httpmetrics.ServeMetrics()
47+
defer httpmetrics.SetupTracer(ctx)()
48+
49+
var cfg config
50+
if err := envconfig.Process(ctx, &cfg); err != nil {
51+
clog.FatalContextf(ctx, "Failed to process environment: %v", err)
52+
}
53+
54+
// Parse path patterns
55+
pats, err := patterns.Parse(cfg.PathPatterns)
56+
if err != nil {
57+
clog.FatalContextf(ctx, "Failed to parse path patterns: %v", err)
58+
}
59+
60+
// Set up workqueue client
61+
wqClient, err := workqueue.NewWorkqueueClient(ctx, cfg.WorkqueueAddr)
62+
if err != nil {
63+
clog.FatalContextf(ctx, "Failed to create workqueue client: %v", err)
64+
}
65+
defer wqClient.Close()
66+
67+
clientCache := githubreconciler.NewClientCache(func(ctx context.Context, org, repo string) (oauth2.TokenSource, error) {
68+
return githubreconciler.NewRepoTokenSource(ctx, cfg.OctoIdentity, org, repo), nil
69+
})
70+
71+
handler := &pushHandler{
72+
clientCache: clientCache,
73+
wqClient: wqClient,
74+
patterns: pats,
75+
}
76+
77+
// Set up Cloud Events receiver
78+
ceClient, err := cloudevents.NewClientHTTP(cloudevents.WithPort(cfg.Port))
79+
if err != nil {
80+
clog.FatalContextf(ctx, "Failed to create CloudEvents client: %v", err)
81+
}
82+
83+
clog.InfoContextf(ctx, "Starting push listener on port %d", cfg.Port)
84+
if err := ceClient.StartReceiver(ctx, handler.handlePushEvent); err != nil {
85+
clog.FatalContextf(ctx, "Failed to start receiver: %v", err)
86+
}
87+
}
88+
89+
type pushHandler struct {
90+
clientCache *githubreconciler.ClientCache
91+
wqClient workqueue.Client
92+
patterns []*regexp.Regexp
93+
}
94+
95+
func (h *pushHandler) handlePushEvent(ctx context.Context, event cloudevents.Event) error {
96+
log := clog.FromContext(ctx)
97+
98+
// Log all events we receive for debugging
99+
log.Infof("Received event: type=%s, source=%s, subject=%s", event.Type(), event.Source(), event.Subject())
100+
101+
// Filter for push events in code
102+
if event.Type() != "dev.chainguard.github.push" {
103+
log.Infof("Ignoring non-push event: %s", event.Type())
104+
return nil
105+
}
106+
107+
// Unwrap the event envelope - the trampoline wraps the GitHub payload
108+
var envelope struct {
109+
Body github.PushEvent `json:"body"`
110+
}
111+
if err := event.DataAs(&envelope); err != nil {
112+
return fmt.Errorf("failed to parse event envelope: %w", err)
113+
}
114+
115+
// Use the push event from the envelope body
116+
pushEvent := envelope.Body
117+
118+
owner := pushEvent.GetRepo().GetOwner().GetLogin()
119+
repo := pushEvent.GetRepo().GetName()
120+
ref := pushEvent.GetRef()
121+
before := pushEvent.GetBefore()
122+
after := pushEvent.GetAfter()
123+
defaultBranch := pushEvent.GetRepo().GetDefaultBranch()
124+
125+
log = log.With(
126+
"owner", owner,
127+
"repo", repo,
128+
"ref", ref,
129+
"before", before,
130+
"after", after,
131+
"default_branch", defaultBranch,
132+
)
133+
ctx = clog.WithLogger(ctx, log)
134+
135+
// Extract branch name from ref (refs/heads/main -> main)
136+
branch := strings.TrimPrefix(ref, "refs/heads/")
137+
138+
// Only process pushes to the default branch
139+
if branch != defaultBranch {
140+
log.Infof("Ignoring push to non-default branch %q (default is %q)", branch, defaultBranch)
141+
return nil
142+
}
143+
144+
log.Infof("Processing push event for %s/%s on default branch %q", owner, repo, defaultBranch)
145+
146+
// Get GitHub client
147+
ghClient, err := h.clientCache.Get(ctx, owner, repo)
148+
if err != nil {
149+
return fmt.Errorf("failed to get GitHub client: %w", err)
150+
}
151+
152+
// Use the GitHub API to compare commits to get all changed files
153+
// This handles all merge strategies correctly (merge commits, squash, rebase)
154+
comparison, _, err := ghClient.Repositories.CompareCommits(ctx, owner, repo, before, after, &github.ListOptions{})
155+
if err != nil {
156+
return fmt.Errorf("failed to compare commits: %w", err)
157+
}
158+
159+
// Collect all changed files from the comparison
160+
changedFiles := make(map[string]struct{})
161+
for _, file := range comparison.Files {
162+
changedFiles[file.GetFilename()] = struct{}{}
163+
}
164+
165+
log.Infof("Processing %d changed files", len(changedFiles))
166+
167+
// Extract keys from changed files
168+
keySet := make(map[string]struct{})
169+
for file := range changedFiles {
170+
for _, regex := range h.patterns {
171+
matches := regex.FindStringSubmatch(file)
172+
if len(matches) <= 1 {
173+
continue
174+
}
175+
176+
capturedPath := matches[1] // First capture group
177+
178+
// Build resource URL using the default branch
179+
url := fmt.Sprintf("https://github.com/%s/%s/blob/%s/%s", owner, repo, defaultBranch, capturedPath)
180+
181+
// Add to key set (deduplicates automatically)
182+
keySet[url] = struct{}{}
183+
break // Only match first pattern
184+
}
185+
}
186+
187+
log.Infof("Enqueueing %d unique keys", len(keySet))
188+
189+
// Enqueue all unique keys
190+
eg, egCtx := errgroup.WithContext(ctx)
191+
192+
for url := range keySet {
193+
url := url // capture for goroutine
194+
eg.Go(func() error {
195+
_, err := h.wqClient.Process(egCtx, &workqueue.ProcessRequest{
196+
Key: url,
197+
Priority: 100, // Process push events immediately
198+
})
199+
if err != nil {
200+
clog.ErrorContextf(egCtx, "Failed to process key %q: %v", url, err)
201+
return err
202+
}
203+
204+
clog.InfoContextf(egCtx, "Enqueued %q", url)
205+
return nil
206+
})
207+
}
208+
209+
if err := eg.Wait(); err != nil {
210+
return fmt.Errorf("failed to enqueue all keys: %w", err)
211+
}
212+
213+
return nil
214+
}

0 commit comments

Comments
 (0)