diff --git a/convex/skills.pendingScanQueue.test.ts b/convex/skills.pendingScanQueue.test.ts new file mode 100644 index 0000000000..3c1c98d412 --- /dev/null +++ b/convex/skills.pendingScanQueue.test.ts @@ -0,0 +1,202 @@ +import { describe, expect, it, vi } from 'vitest' +import { getPendingScanSkillsInternal } from './skills' + +type PendingScanResult = Array<{ + skillId: string + versionId: string | null + sha256hash: string | null + checkCount: number +}> + +type WrappedHandler = { + _handler: (ctx: unknown, args: TArgs) => Promise +} + +const getPendingScanSkillsHandler = ( + getPendingScanSkillsInternal as unknown as WrappedHandler, PendingScanResult> +)._handler + +describe('skills.getPendingScanSkillsInternal', () => { + it('includes unresolved VT records from the oldest slice and skips finalized ones', async () => { + const recentSkills = [ + makeSkill('skills:recent-clean', 'skillVersions:recent-clean', 'scanner.llm.clean'), + makeSkill('skills:recent-malicious', 'skillVersions:recent-malicious', 'scanner.vt.pending'), + ] + const oldestSkills = [ + makeSkill('skills:old-pending', 'skillVersions:old-pending', 'scanner.vt.pending'), + makeSkill('skills:old-stale', 'skillVersions:old-stale', 'scanner.llm.clean'), + makeSkill('skills:old-no-hash', 'skillVersions:old-no-hash', 'scanner.vt.pending'), + ] + + const versions = new Map([ + [ + 'skillVersions:recent-clean', + { _id: 'skillVersions:recent-clean', sha256hash: 'a'.repeat(64), vtAnalysis: { status: 'clean' } }, + ], + [ + 'skillVersions:recent-malicious', + { + _id: 'skillVersions:recent-malicious', + sha256hash: 'b'.repeat(64), + vtAnalysis: { status: 'malicious' }, + }, + ], + [ + 'skillVersions:old-pending', + { _id: 'skillVersions:old-pending', sha256hash: 'c'.repeat(64), vtAnalysis: { status: 'pending' } }, + ], + [ + 'skillVersions:old-stale', + { _id: 'skillVersions:old-stale', sha256hash: 'd'.repeat(64), vtAnalysis: { status: 'stale' } }, + ], + ['skillVersions:old-no-hash', { _id: 'skillVersions:old-no-hash' }], + ]) + + const ctx = { + db: { + query: vi.fn((table: string) => { + if (table !== 'skills') throw new Error(`unexpected table ${table}`) + return { + withIndex: ( + indexName: string, + builder: (q: { eq: (field: string, value: unknown) => unknown }) => unknown, + ) => { + builder({ eq: () => ({}) }) + if (indexName === 'by_active_updated') { + return { + order: () => ({ + take: async () => recentSkills, + }), + } + } + if (indexName === 'by_active_created') { + return { + order: () => ({ + take: async () => oldestSkills, + }), + } + } + throw new Error(`unexpected index ${indexName}`) + }, + } + }), + get: vi.fn(async (id: string) => versions.get(id) ?? null), + }, + } + + const result = await getPendingScanSkillsHandler(ctx, { + limit: 25, + skipRecentMinutes: 0, + }) + + const ids = new Set(result.map((entry) => entry.skillId)) + expect(ids.has('skills:old-pending')).toBe(true) + expect(ids.has('skills:old-stale')).toBe(true) + expect(ids.has('skills:recent-clean')).toBe(false) + expect(ids.has('skills:recent-malicious')).toBe(false) + expect(ids.has('skills:old-no-hash')).toBe(false) + }) + + it('exhaustive mode ignores recent-check suppression for manual backfills', async () => { + const now = Date.now() + const allSkills = [ + makeSkill('skills:recently-checked', 'skillVersions:recently-checked', 'scanner.vt.pending', now), + ] + const versions = new Map([ + [ + 'skillVersions:recently-checked', + { _id: 'skillVersions:recently-checked', sha256hash: 'e'.repeat(64) }, + ], + ]) + + const withIndex = vi.fn( + ( + indexName: string, + builder: (q: { eq: (field: string, value: unknown) => unknown }) => unknown, + ) => { + builder({ eq: () => ({}) }) + if (indexName !== 'by_active_updated') throw new Error(`unexpected index ${indexName}`) + return { + collect: async () => allSkills, + } + }, + ) + + const ctx = { + db: { + query: vi.fn((table: string) => { + if (table !== 'skills') throw new Error(`unexpected table ${table}`) + return { withIndex } + }), + get: vi.fn(async (id: string) => versions.get(id) ?? null), + }, + } + + const result = await getPendingScanSkillsHandler(ctx, { + limit: 25, + skipRecentMinutes: 60, + exhaustive: true, + }) + + expect(result).toHaveLength(1) + expect(result[0]?.skillId).toBe('skills:recently-checked') + }) + + it('does not clamp exhaustive mode to 100 records', async () => { + const allSkills = Array.from({ length: 150 }, (_, i) => + makeSkill(`skills:bulk-${i}`, `skillVersions:bulk-${i}`, 'scanner.vt.pending'), + ) + const versions = new Map( + allSkills.map((skill) => { + const versionId = skill.latestVersionId as string + return [versionId, { _id: versionId, sha256hash: `${String(versionId).slice(-8)}${'f'.repeat(56)}` }] + }), + ) + + const withIndex = vi.fn( + ( + indexName: string, + builder: (q: { eq: (field: string, value: unknown) => unknown }) => unknown, + ) => { + builder({ eq: () => ({}) }) + if (indexName !== 'by_active_updated') throw new Error(`unexpected index ${indexName}`) + return { + collect: async () => allSkills, + } + }, + ) + + const ctx = { + db: { + query: vi.fn((table: string) => { + if (table !== 'skills') throw new Error(`unexpected table ${table}`) + return { withIndex } + }), + get: vi.fn(async (id: string) => versions.get(id) ?? null), + }, + } + + const result = await getPendingScanSkillsHandler(ctx, { + limit: 10000, + exhaustive: true, + skipRecentMinutes: 0, + }) + + expect(result).toHaveLength(150) + }) +}) + +function makeSkill( + id: string, + versionId: string, + moderationReason: string, + scanLastCheckedAt?: number, +) { + return { + _id: id, + moderationStatus: 'active', + moderationReason, + latestVersionId: versionId, + scanLastCheckedAt, + } +} diff --git a/convex/skills.ts b/convex/skills.ts index f2a3e57244..ac8c153ddb 100644 --- a/convex/skills.ts +++ b/convex/skills.ts @@ -1756,19 +1756,47 @@ export const getSkillByIdInternal = internalQuery({ }) export const getPendingScanSkillsInternal = internalQuery({ - args: { limit: v.optional(v.number()), skipRecentMinutes: v.optional(v.number()) }, + args: { + limit: v.optional(v.number()), + skipRecentMinutes: v.optional(v.number()), + exhaustive: v.optional(v.boolean()), + }, handler: async (ctx, args) => { - const limit = clampInt(args.limit ?? 10, 1, 100) - const skipRecentMinutes = args.skipRecentMinutes ?? 60 + const exhaustive = args.exhaustive ?? false + const limit = exhaustive ? Math.max(1, Math.floor(args.limit ?? 10000)) : clampInt(args.limit ?? 10, 1, 100) + const skipRecentMinutes = exhaustive ? 0 : (args.skipRecentMinutes ?? 60) const skipThreshold = Date.now() - skipRecentMinutes * 60 * 1000 - // Use an indexed query and bounded scan to avoid full-table reads under spam/high volume. - const poolSize = Math.min(Math.max(limit * 20, 200), 1000) - const allSkills = await ctx.db - .query('skills') - .withIndex('by_active_updated', (q) => q.eq('softDeletedAt', undefined)) - .order('desc') - .take(poolSize) + let allSkills: Doc<'skills'>[] = [] + if (exhaustive) { + // Used by manual/backfill tooling where fairness matters more than query cost. + allSkills = await ctx.db + .query('skills') + .withIndex('by_active_updated', (q) => q.eq('softDeletedAt', undefined)) + .collect() + } else { + // Mix "most recently updated" with "oldest created" slices so older pending + // items don't starve behind high-churn records. + const poolSize = Math.min(Math.max(limit * 20, 200), 1000) + const [recentSkills, oldestSkills] = await Promise.all([ + ctx.db + .query('skills') + .withIndex('by_active_updated', (q) => q.eq('softDeletedAt', undefined)) + .order('desc') + .take(poolSize), + ctx.db + .query('skills') + .withIndex('by_active_created', (q) => q.eq('softDeletedAt', undefined)) + .order('asc') + .take(poolSize), + ]) + + const deduped = new Map, Doc<'skills'>>() + for (const skill of [...recentSkills, ...oldestSkills]) { + deduped.set(skill._id, skill) + } + allSkills = [...deduped.values()] + } const candidates = allSkills.filter((skill) => { const reason = skill.moderationReason @@ -1783,10 +1811,11 @@ export const getPendingScanSkillsInternal = internalQuery({ ) }) - // Filter out recently checked skills - const skills = candidates.filter( - (s) => !s.scanLastCheckedAt || s.scanLastCheckedAt < skipThreshold, - ) + // Filter out recently checked skills unless caller explicitly disables recency filtering. + const skills = + skipRecentMinutes <= 0 + ? candidates + : candidates.filter((s) => !s.scanLastCheckedAt || s.scanLastCheckedAt < skipThreshold) // Shuffle and take the requested limit (Fisher-Yates) for (let i = skills.length - 1; i > 0; i--) { @@ -1802,10 +1831,13 @@ export const getPendingScanSkillsInternal = internalQuery({ checkCount: number }> = [] + const FINAL_VT_STATUSES = new Set(['clean', 'malicious', 'suspicious']) for (const skill of selected) { const version = skill.latestVersionId ? await ctx.db.get(skill.latestVersionId) : null - // Skip skills where version already has vtAnalysis or lacks sha256hash - if (version?.vtAnalysis || !version?.sha256hash) continue + if (!version?.sha256hash) continue + const vtStatus = version.vtAnalysis?.status?.trim().toLowerCase() + // Keep retrying unresolved VT results (pending/stale/error), but skip finalized outcomes. + if (vtStatus && FINAL_VT_STATUSES.has(vtStatus)) continue results.push({ skillId: skill._id, versionId: version?._id ?? null, diff --git a/convex/vt.ts b/convex/vt.ts index 2838bcb87a..7f236534d4 100644 --- a/convex/vt.ts +++ b/convex/vt.ts @@ -672,6 +672,8 @@ export const backfillPendingScans = internalAction({ internal.skills.getPendingScanSkillsInternal, { limit: 10000, + exhaustive: true, + skipRecentMinutes: 0, }, )