Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
202 changes: 202 additions & 0 deletions convex/skills.pendingScanQueue.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
import { describe, expect, it, vi } from 'vitest'
import { getPendingScanSkillsInternal } from './skills'

type PendingScanResult = Array<{
skillId: string
versionId: string | null
sha256hash: string | null
checkCount: number
}>

type WrappedHandler<TArgs, TResult> = {
_handler: (ctx: unknown, args: TArgs) => Promise<TResult>
}

const getPendingScanSkillsHandler = (
getPendingScanSkillsInternal as unknown as WrappedHandler<Record<string, unknown>, PendingScanResult>
)._handler

describe('skills.getPendingScanSkillsInternal', () => {
it('includes unresolved VT records from the oldest slice and skips finalized ones', async () => {
const recentSkills = [
makeSkill('skills:recent-clean', 'skillVersions:recent-clean', 'scanner.llm.clean'),
makeSkill('skills:recent-malicious', 'skillVersions:recent-malicious', 'scanner.vt.pending'),
]
const oldestSkills = [
makeSkill('skills:old-pending', 'skillVersions:old-pending', 'scanner.vt.pending'),
makeSkill('skills:old-stale', 'skillVersions:old-stale', 'scanner.llm.clean'),
makeSkill('skills:old-no-hash', 'skillVersions:old-no-hash', 'scanner.vt.pending'),
]

const versions = new Map<string, unknown>([
[
'skillVersions:recent-clean',
{ _id: 'skillVersions:recent-clean', sha256hash: 'a'.repeat(64), vtAnalysis: { status: 'clean' } },
],
[
'skillVersions:recent-malicious',
{
_id: 'skillVersions:recent-malicious',
sha256hash: 'b'.repeat(64),
vtAnalysis: { status: 'malicious' },
},
],
[
'skillVersions:old-pending',
{ _id: 'skillVersions:old-pending', sha256hash: 'c'.repeat(64), vtAnalysis: { status: 'pending' } },
],
[
'skillVersions:old-stale',
{ _id: 'skillVersions:old-stale', sha256hash: 'd'.repeat(64), vtAnalysis: { status: 'stale' } },
],
['skillVersions:old-no-hash', { _id: 'skillVersions:old-no-hash' }],
])

const ctx = {
db: {
query: vi.fn((table: string) => {
if (table !== 'skills') throw new Error(`unexpected table ${table}`)
return {
withIndex: (
indexName: string,
builder: (q: { eq: (field: string, value: unknown) => unknown }) => unknown,
) => {
builder({ eq: () => ({}) })
if (indexName === 'by_active_updated') {
return {
order: () => ({
take: async () => recentSkills,
}),
}
}
if (indexName === 'by_active_created') {
return {
order: () => ({
take: async () => oldestSkills,
}),
}
}
throw new Error(`unexpected index ${indexName}`)
},
}
}),
get: vi.fn(async (id: string) => versions.get(id) ?? null),
},
}

const result = await getPendingScanSkillsHandler(ctx, {
limit: 25,
skipRecentMinutes: 0,
})

const ids = new Set(result.map((entry) => entry.skillId))
expect(ids.has('skills:old-pending')).toBe(true)
expect(ids.has('skills:old-stale')).toBe(true)
expect(ids.has('skills:recent-clean')).toBe(false)
expect(ids.has('skills:recent-malicious')).toBe(false)
expect(ids.has('skills:old-no-hash')).toBe(false)
})

it('exhaustive mode ignores recent-check suppression for manual backfills', async () => {
const now = Date.now()
const allSkills = [
makeSkill('skills:recently-checked', 'skillVersions:recently-checked', 'scanner.vt.pending', now),
]
const versions = new Map<string, unknown>([
[
'skillVersions:recently-checked',
{ _id: 'skillVersions:recently-checked', sha256hash: 'e'.repeat(64) },
],
])

const withIndex = vi.fn(
(
indexName: string,
builder: (q: { eq: (field: string, value: unknown) => unknown }) => unknown,
) => {
builder({ eq: () => ({}) })
if (indexName !== 'by_active_updated') throw new Error(`unexpected index ${indexName}`)
return {
collect: async () => allSkills,
}
},
)

const ctx = {
db: {
query: vi.fn((table: string) => {
if (table !== 'skills') throw new Error(`unexpected table ${table}`)
return { withIndex }
}),
get: vi.fn(async (id: string) => versions.get(id) ?? null),
},
}

const result = await getPendingScanSkillsHandler(ctx, {
limit: 25,
skipRecentMinutes: 60,
exhaustive: true,
})

expect(result).toHaveLength(1)
expect(result[0]?.skillId).toBe('skills:recently-checked')
})

it('does not clamp exhaustive mode to 100 records', async () => {
const allSkills = Array.from({ length: 150 }, (_, i) =>
makeSkill(`skills:bulk-${i}`, `skillVersions:bulk-${i}`, 'scanner.vt.pending'),
)
const versions = new Map<string, unknown>(
allSkills.map((skill) => {
const versionId = skill.latestVersionId as string
return [versionId, { _id: versionId, sha256hash: `${String(versionId).slice(-8)}${'f'.repeat(56)}` }]
}),
)

const withIndex = vi.fn(
(
indexName: string,
builder: (q: { eq: (field: string, value: unknown) => unknown }) => unknown,
) => {
builder({ eq: () => ({}) })
if (indexName !== 'by_active_updated') throw new Error(`unexpected index ${indexName}`)
return {
collect: async () => allSkills,
}
},
)

const ctx = {
db: {
query: vi.fn((table: string) => {
if (table !== 'skills') throw new Error(`unexpected table ${table}`)
return { withIndex }
}),
get: vi.fn(async (id: string) => versions.get(id) ?? null),
},
}

const result = await getPendingScanSkillsHandler(ctx, {
limit: 10000,
exhaustive: true,
skipRecentMinutes: 0,
})

expect(result).toHaveLength(150)
})
})

function makeSkill(
id: string,
versionId: string,
moderationReason: string,
scanLastCheckedAt?: number,
) {
return {
_id: id,
moderationStatus: 'active',
moderationReason,
latestVersionId: versionId,
scanLastCheckedAt,
}
}
64 changes: 48 additions & 16 deletions convex/skills.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1756,19 +1756,47 @@ export const getSkillByIdInternal = internalQuery({
})

export const getPendingScanSkillsInternal = internalQuery({
args: { limit: v.optional(v.number()), skipRecentMinutes: v.optional(v.number()) },
args: {
limit: v.optional(v.number()),
skipRecentMinutes: v.optional(v.number()),
exhaustive: v.optional(v.boolean()),
},
handler: async (ctx, args) => {
const limit = clampInt(args.limit ?? 10, 1, 100)
const skipRecentMinutes = args.skipRecentMinutes ?? 60
const exhaustive = args.exhaustive ?? false
const limit = exhaustive ? Math.max(1, Math.floor(args.limit ?? 10000)) : clampInt(args.limit ?? 10, 1, 100)
const skipRecentMinutes = exhaustive ? 0 : (args.skipRecentMinutes ?? 60)
const skipThreshold = Date.now() - skipRecentMinutes * 60 * 1000

// Use an indexed query and bounded scan to avoid full-table reads under spam/high volume.
const poolSize = Math.min(Math.max(limit * 20, 200), 1000)
const allSkills = await ctx.db
.query('skills')
.withIndex('by_active_updated', (q) => q.eq('softDeletedAt', undefined))
.order('desc')
.take(poolSize)
let allSkills: Doc<'skills'>[] = []
if (exhaustive) {
// Used by manual/backfill tooling where fairness matters more than query cost.
allSkills = await ctx.db
.query('skills')
.withIndex('by_active_updated', (q) => q.eq('softDeletedAt', undefined))
.collect()
} else {
// Mix "most recently updated" with "oldest created" slices so older pending
// items don't starve behind high-churn records.
const poolSize = Math.min(Math.max(limit * 20, 200), 1000)
const [recentSkills, oldestSkills] = await Promise.all([
ctx.db
.query('skills')
.withIndex('by_active_updated', (q) => q.eq('softDeletedAt', undefined))
.order('desc')
.take(poolSize),
ctx.db
.query('skills')
.withIndex('by_active_created', (q) => q.eq('softDeletedAt', undefined))
.order('asc')
.take(poolSize),
])

const deduped = new Map<Id<'skills'>, Doc<'skills'>>()
for (const skill of [...recentSkills, ...oldestSkills]) {
deduped.set(skill._id, skill)
}
allSkills = [...deduped.values()]
}

const candidates = allSkills.filter((skill) => {
const reason = skill.moderationReason
Expand All @@ -1783,10 +1811,11 @@ export const getPendingScanSkillsInternal = internalQuery({
)
})

// Filter out recently checked skills
const skills = candidates.filter(
(s) => !s.scanLastCheckedAt || s.scanLastCheckedAt < skipThreshold,
)
// Filter out recently checked skills unless caller explicitly disables recency filtering.
const skills =
skipRecentMinutes <= 0
? candidates
: candidates.filter((s) => !s.scanLastCheckedAt || s.scanLastCheckedAt < skipThreshold)

// Shuffle and take the requested limit (Fisher-Yates)
for (let i = skills.length - 1; i > 0; i--) {
Expand All @@ -1802,10 +1831,13 @@ export const getPendingScanSkillsInternal = internalQuery({
checkCount: number
}> = []

const FINAL_VT_STATUSES = new Set(['clean', 'malicious', 'suspicious'])
for (const skill of selected) {
const version = skill.latestVersionId ? await ctx.db.get(skill.latestVersionId) : null
// Skip skills where version already has vtAnalysis or lacks sha256hash
if (version?.vtAnalysis || !version?.sha256hash) continue
if (!version?.sha256hash) continue
const vtStatus = version.vtAnalysis?.status?.trim().toLowerCase()
// Keep retrying unresolved VT results (pending/stale/error), but skip finalized outcomes.
if (vtStatus && FINAL_VT_STATUSES.has(vtStatus)) continue
results.push({
skillId: skill._id,
versionId: version?._id ?? null,
Expand Down
2 changes: 2 additions & 0 deletions convex/vt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -672,6 +672,8 @@ export const backfillPendingScans = internalAction({
internal.skills.getPendingScanSkillsInternal,
{
limit: 10000,
exhaustive: true,
skipRecentMinutes: 0,
},
)

Expand Down