Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions run/easylabel.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ async function runLabelOpInitializeScan() {
)
.confluenceByParallel()
.map(async (issue) => {
// processIssueComment({issue});
console.log(`+issue ${issue.html_url} with ${issue.comments} comments`);
if (!issue.comments) return;
await pageFlow(1, async (page, per_page = 100) => {
Expand Down
2 changes: 1 addition & 1 deletion run/gh-bugcop/gh-bugcop.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ async function processIssue(issue: GH["issue"]) {
.filter((e): e is NonNullable<typeof e> => e !== null)
.toArray();
tlog("Found " + labelEvents.length + " unlabeled/labeled/commented events");
await saveTask({ timeline: labelEvents as any });
await saveTask({ timeline: labelEvents });

function lastLabeled(labelName: string) {
return labelEvents
Expand Down
196 changes: 161 additions & 35 deletions run/index.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
import KeyvSqlite from "@keyv/sqlite";
import type { WebhookEventMap } from "@octokit/webhooks-types";
import DIE from "@snomiao/die";
import crypto from "crypto";
import Keyv from "keyv";
import { tap } from "rambda";
import sflow, { pageFlow } from "sflow";
import { match, P } from "ts-pattern";
import { type UnionToIntersection } from "type-fest";
import { gh, type GH } from "../src/gh/index.js";
import { ghc } from "../src/ghc.js";
import { parseGithubRepoUrl } from "../src/parseOwnerRepo.js";
import { processIssueCommentForLableops } from "./easylabel";
import type { WEBHOOK_EVENT } from "./github-webhook-event-type";
export const REPOLIST = [
"https://github.com/Comfy-Org/Comfy-PR",
"https://github.com/comfyanonymous/ComfyUI",
Expand Down Expand Up @@ -46,14 +47,17 @@ type Webhook =
class RepoEventMonitor {
private monitorState = new Map<string, RepoMonitorState>();
private stateCache: Keyv<RepoMonitorState>;
private commentCache: Keyv<Map<number, string>>; // Map of comment ID to updated_at timestamp
private pollingRepos = new Set<string>();
private pollInterval = 30000; // 30 seconds
private commentPollInterval = 5000; // 5 seconds for comment polling
private webhookSetupComplete = false;

constructor() {
// Initialize SQLite cache
const sqlite = new KeyvSqlite("gh-service/state.sqlite");
this.stateCache = new Keyv({ store: sqlite });
this.commentCache = new Keyv({ store: new KeyvSqlite("gh-service/comment-cache.sqlite") });

// Initialize state for each repo
for (const repoUrl of REPOLIST) {
Expand Down Expand Up @@ -119,32 +123,30 @@ class RepoEventMonitor {
if (!this.verifyWebhookSignature(body, signature)) return new Response("Unauthorized", { status: 401 });

const payload = JSON.parse(body);
this.handleWebhookEvent({ type: event, payload } as WEBHOOK_EVENT);
this.handleWebhookEvent({ [event]: payload } as WebhookEventMap);
return new Response("OK");
}

private async handleWebhookEvent(event: WEBHOOK_EVENT) {
private async handleWebhookEvent(eventMap: WebhookEventMap) {
const timestamp = this.formatTimestamp();
// const repo = event.payload.repository;
// const repoName = repo ? `${repo.owner.login}/${repo.name}` : "unknown";

match(event)
match(eventMap)
// .with({ type: "issues" }, async ({ payload: { issue } }) =>
// processIssueCommentForLableops({ issue: issue as GH["issue"], comment: comment as GH["issue-comment"] }),
// )
.with({ type: "issue_comment" }, async ({ payload: { issue, comment } }) =>
.with({ issue_comment: P.select() }, async ({ issue, comment }) =>
processIssueCommentForLableops({ issue: issue as GH["issue"], comment: comment as GH["issue-comment"] }),
)
.otherwise(() => null);

// match core-important in +Core-Important
match(event)
.with({ payload: { issue: { html_url: P.string }, comment: { body: P.string } } }, async ({ type, payload }) => {
const { issue, comment, action } = payload;
const fullEvent = `${type}:${action}` as const;
console.log(type, comment.body);
return { issueUrl: issue.html_url, body: comment.body };
})
.otherwise(() => null);
// match(eventMap)
// .with({ payload: { issue: { html_url: P.string }, comment: { body: P.string } } }, async ({ type, payload }) => {
// const { issue, comment, action } = payload;
// const fullEvent = `${type}:${action}` as const;
// console.log(type, comment.body);
// return { issueUrl: issue.html_url, body: comment.body };
// })
// .otherwise(() => null);

// match(event)
// .with({ type: "pull_request" }, ({ type, payload }) => payload.comment.body)
Expand Down Expand Up @@ -273,23 +275,25 @@ class RepoEventMonitor {
}

// Create webhook
await gh.repos.createWebhook({
owner,
repo,
config: {
url: WEBHOOK_URL,
content_type: "json",
secret: WEBHOOK_SECRET,
},
events: [
"issues",
"pull_request",
"issue_comment",
"pull_request_review",
"pull_request_review_comment",
"label",
],
});
await gh.repos.createWebhook(
tap(console.log, {
owner,
repo,
config: {
url: WEBHOOK_URL,
content_type: "json",
secret: WEBHOOK_SECRET,
},
events: [
"issues",
"pull_request",
"issue_comment",
"pull_request_review",
"pull_request_review_comment",
"label",
],
}),
);

console.log(`[${this.formatTimestamp()}] ✅ Webhook created for ${owner}/${repo}`);
} catch (error: any) {
Expand All @@ -316,12 +320,19 @@ class RepoEventMonitor {

console.log(`[${this.formatTimestamp()}] Monitoring repos: ${REPOLIST.join(", ")}`);

// Start comment polling for all repos (5 second interval)
console.log(`[${this.formatTimestamp()}] Starting comment polling (5s interval) for recent comments...`);
setInterval(() => {
this.pollRecentComments();
}, this.commentPollInterval);

// Initial comment check
await this.pollRecentComments();

if (WEBHOOK_URL) {
console.log(`[${this.formatTimestamp()}] Using webhooks for real-time notifications`);
await this.setupWebhooks();

// TODO: polling way

// // Start polling for repos that couldn't set up webhooks
if (this.pollingRepos.size > 0) {
console.log(
Expand All @@ -346,6 +357,121 @@ class RepoEventMonitor {
}
}

private async pollRecentComments() {
// Check for comments in the last 5 minutes
const since = new Date(Date.now() - 5 * 60 * 1000).toISOString();

for (const repoUrl of REPOLIST) {
// Listing issue comments for recent 5min
console.log(`[${this.formatTimestamp()}] Checking recent comments for ${repoUrl}`);
try {
const { owner, repo } = this.parseRepoUrl(repoUrl);
const cacheKey = `${owner}/${repo}`;

// Get cached comment timestamps
const cachedComments = (await this.commentCache.get(cacheKey)) || new Map<number, string>();

// List recent comments for the repository
const { data: comments } = await gh.issues.listCommentsForRepo({
owner,
repo,
since,
sort: "updated",
direction: "desc",
per_page: 100,
});

const newCachedComments = new Map<number, string>();

for (const comment of comments) {
newCachedComments.set(comment.id, comment.updated_at);

const previousUpdatedAt = cachedComments.get(comment.id);

if (!previousUpdatedAt) {
// New comment - mock issue_comment.created event
console.log(
`[${this.formatTimestamp()}] 💬 NEW COMMENT DETECTED: ${owner}/${repo} #${comment.issue_url?.split("/").pop()} - Comment ID: ${comment.id}`,
);

// Fetch the issue data for the mock event
const issueNumber = parseInt(comment.issue_url?.split("/").pop() || "0");
if (issueNumber) {
try {
const { data: issue } = await gh.issues.get({ owner, repo, issue_number: issueNumber });

// Create mock webhook event for new comment

// Handle the mock event
await this.handleWebhookEvent(
// @ts-ignore TODO fix type
tap((e) => console.log("mocked-webhook-event", e), {
issue_comment: {
action: "created",
issue: { ...issue } satisfies WebhookEventMap["issue_comment"]["issue"],
comment: comment satisfies WebhookEventMap["issue_comment"]["comment"],
repository: {
owner: { login: owner },
name: repo,
full_name: `${owner}/${repo}`,
} satisfies WebhookEventMap["issue_comment"]["repository"],
sender: comment.user! satisfies WebhookEventMap["issue_comment"]["sender"],
},
Comment on lines +407 to +419
Copy link
Preview

Copilot AI Sep 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using @ts-ignore defeats the purpose of enabling noImplicitAny. The TODO comment indicates this should be properly typed. Consider creating a proper type assertion or helper function to construct the webhook event with correct typing."

Copilot uses AI. Check for mistakes.

} satisfies WebhookEventMap),
);
} catch (error) {
console.error(`[${this.formatTimestamp()}] Error fetching issue for comment:`, error);
}
}
} else if (previousUpdatedAt !== comment.updated_at) {
// Updated comment - mock issue_comment.edited event
console.log(
`[${this.formatTimestamp()}] ✏️ COMMENT UPDATED: ${owner}/${repo} #${comment.issue_url?.split("/").pop()} - Comment ID: ${comment.id}`,
);

// Fetch the issue data for the mock event
const issueNumber = parseInt(comment.issue_url?.split("/").pop() || "0");

if (issueNumber) {
try {
const { data: issue } = await gh.issues.get({ owner, repo, issue_number: issueNumber });
// Handle the mock event
// @ts-ignore TODO fix type
await this.handleWebhookEvent(
tap(console.debug, {
issue_comment: {
action: "edited",
issue,
comment,
repository: {
owner: { login: owner },
name: repo,
full_name: `${owner}/${repo}`,
},
sender: comment.user!,
changes: {
body: {
from: "previous content", // We don't have the old content, but the webhook handler doesn't use it
Copy link
Preview

Copilot AI Sep 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hard-coded placeholder string creates potential confusion. Consider using a more explicit placeholder like 'UNKNOWN_PREVIOUS_CONTENT' or define a constant to make the intent clearer."

Copilot uses AI. Check for mistakes.

},
},
},
} satisfies WebhookEventMap),
);
Comment on lines +439 to +459
Copy link
Preview

Copilot AI Sep 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another @ts-ignore usage that conflicts with the noImplicitAny goal. This duplicated pattern suggests creating a reusable helper function to construct mock webhook events with proper typing would be beneficial."

Copilot uses AI. Check for mistakes.

} catch (error) {
console.error(`[${this.formatTimestamp()}] Error fetching issue for comment:`, error);
}
}
}
}

// Update cache with new comment timestamps
await this.commentCache.set(cacheKey, newCachedComments);
} catch (error) {
console.error(`[${this.formatTimestamp()}] Error polling comments for ${repoUrl}:`, error);
}
}
}

private async checkPollingRepos() {
sflow(this.pollingRepos).map((html_url) => {
pageFlow(1, async (page, per_page = 100) => {
Expand Down
6 changes: 3 additions & 3 deletions src/WorkerInstances.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ export type WorkerInstance = {
};

const k = "COMFY_PR_WorkerInstanceKey";
type g = typeof globalThis & { [k]: any };
const g = globalThis as typeof globalThis & { [k]: string };
function getWorkerInstanceId() {
// ensure only one instance
if (!(global as any as g)[k])
if (!g[k])
defer(async function () {
await Promise.all([postWorkerHeartBeatLoop(), watchWorkerInstancesLoop()]);
});
const instanceId = ((global as any as g)[k] ??= createInstanceId());
const instanceId = (g[k] ??= createInstanceId());
return instanceId;
}
export const WorkerInstances = db.collection<WorkerInstance>("WorkerInstances");
Expand Down
1 change: 1 addition & 0 deletions tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"esModuleInterop": true,
"resolvePackageJsonImports": true,
"resolvePackageJsonExports": true,
"noImplicitAny": true,
// Best practices
"strict": true,
"strictNullChecks": true,
Expand Down