From 74f746f4373090284896bdf9b916e9d31d1140a6 Mon Sep 17 00:00:00 2001 From: Alessio Gravili Date: Thu, 12 Jun 2025 14:06:23 -0700 Subject: [PATCH 01/18] initial API --- .../payload/src/queues/config/types/index.ts | 27 +++++++++++++++++++ .../src/queues/config/types/taskTypes.ts | 5 ++++ .../src/queues/config/types/workflowTypes.ts | 5 ++++ test/queues/config.ts | 10 +++++++ 4 files changed, 47 insertions(+) diff --git a/packages/payload/src/queues/config/types/index.ts b/packages/payload/src/queues/config/types/index.ts index 9d261113c0a..f80211e3fdd 100644 --- a/packages/payload/src/queues/config/types/index.ts +++ b/packages/payload/src/queues/config/types/index.ts @@ -127,3 +127,30 @@ export type JobsConfig = { */ workflows?: WorkflowConfig[] } + +export type ScheduleConfig = { + /** + * The cron for scheduling the job. + * + * @example + * ┌───────────── (optional) second (0 - 59) + * │ ┌───────────── minute (0 - 59) + * │ │ ┌───────────── hour (0 - 23) + * │ │ │ ┌───────────── day of the month (1 - 31) + * │ │ │ │ ┌───────────── month (1 - 12) + * │ │ │ │ │ ┌───────────── day of the week (0 - 6) (Sunday to Saturday) + * │ │ │ │ │ │ + * │ │ │ │ │ │ + * - '* 0 * * * *' every hour at minute 0 + * - '* 0 0 * * *' daily at midnight + * - '* 0 0 * * 0' weekly at midnight on Sundays + * - '* 0 0 1 * *' monthly at midnight on the 1st day of the month + * - '* 0/5 * * * *' every 5 minutes + * - '* * * * * *' every second + */ + cron: string + /** + * Queue to which the scheduled job will be added. + */ + queue: string +} diff --git a/packages/payload/src/queues/config/types/taskTypes.ts b/packages/payload/src/queues/config/types/taskTypes.ts index 0a698905a00..61d8180105e 100644 --- a/packages/payload/src/queues/config/types/taskTypes.ts +++ b/packages/payload/src/queues/config/types/taskTypes.ts @@ -1,4 +1,5 @@ import type { Field, PayloadRequest, StringKeyOf, TypedJobs } from '../../../index.js' +import type { ScheduleConfig } from './index.js' import type { BaseJob, RunningJob, RunningJobSimple, SingleTaskStatus } from './workflowTypes.js' export type TaskInputOutput = { @@ -233,6 +234,10 @@ export type TaskConfig< * @default By default, tasks are not retried and `retries` is `undefined`. */ retries?: number | RetryConfig | undefined + /** + * Allows automatically scheduling this task to run regularly at a specified interval. + */ + schedule?: ScheduleConfig[] /** * Define a slug-based name for this job. This slug needs to be unique among both tasks and workflows. */ diff --git a/packages/payload/src/queues/config/types/workflowTypes.ts b/packages/payload/src/queues/config/types/workflowTypes.ts index 823dff112f8..ddad598f35a 100644 --- a/packages/payload/src/queues/config/types/workflowTypes.ts +++ b/packages/payload/src/queues/config/types/workflowTypes.ts @@ -1,6 +1,7 @@ import type { Field } from '../../../fields/config/types.js' import type { PayloadRequest, StringKeyOf, TypedCollection, TypedJobs } from '../../../index.js' import type { TaskParent } from '../../operations/runJobs/runJob/getRunTaskFunction.js' +import type { ScheduleConfig } from './index.js' import type { RetryConfig, RunInlineTaskFunction, @@ -130,6 +131,10 @@ export type WorkflowConfig { + return { + output: {}, + } + }, + }, { retries: 2, slug: 'UpdatePost', From fb81dc83e71039399328e834f2a20008f257f563 Mon Sep 17 00:00:00 2001 From: Alessio Gravili Date: Thu, 12 Jun 2025 14:43:34 -0700 Subject: [PATCH 02/18] simplify --- .../payload/src/queues/restEndpointRun.ts | 37 ++++++++++--------- test/queues/config.ts | 5 ++- 2 files changed, 22 insertions(+), 20 deletions(-) diff --git a/packages/payload/src/queues/restEndpointRun.ts b/packages/payload/src/queues/restEndpointRun.ts index 888e161c344..9cc03f719b8 100644 --- a/packages/payload/src/queues/restEndpointRun.ts +++ b/packages/payload/src/queues/restEndpointRun.ts @@ -2,21 +2,6 @@ import type { Endpoint, SanitizedConfig } from '../config/types.js' import { runJobs, type RunJobsArgs } from './operations/runJobs/index.js' -const configHasJobs = (config: SanitizedConfig): boolean => { - if (!config.jobs) { - return false - } - - if (config.jobs.tasks?.length > 0) { - return true - } - if (config.jobs.workflows?.length > 0) { - return true - } - - return false -} - export const runJobsEndpoint: Endpoint = { handler: async (req) => { if (!configHasJobs(req.payload.config)) { @@ -41,10 +26,12 @@ export const runJobsEndpoint: Endpoint = { const { limit, queue } = req.query + await handleSchedules() + const runJobsArgs: RunJobsArgs = { queue: 'default', req, - // We are checking access above, so we can override it here + // Access is validated above, so it's safe to override here overrideAccess: true, } @@ -52,8 +39,9 @@ export const runJobsEndpoint: Endpoint = { runJobsArgs.queue = queue } - if (typeof limit !== 'undefined') { - runJobsArgs.limit = Number(limit) + const parsedLimit = Number(limit) + if (!isNaN(parsedLimit)) { + runJobsArgs.limit = parsedLimit } let noJobsRemaining = false @@ -91,3 +79,16 @@ export const runJobsEndpoint: Endpoint = { method: 'get', path: '/run', } + +const configHasJobs = (config: SanitizedConfig): boolean => { + return Boolean(config.jobs?.tasks?.length || config.jobs?.workflows?.length) +} + +/** + * On vercel, we cannot auto-schedule jobs using a Cron - instead, we'll use this same endpoint that can + * also be called from Vercel Cron for auto-running jobs. + * + * The benefit of doing it like this instead of a separate endpoint is that we can run jobs immediately + * after they are scheduled + */ +async function handleSchedules() {} diff --git a/test/queues/config.ts b/test/queues/config.ts index 1d7753bc8b3..93820759a55 100644 --- a/test/queues/config.ts +++ b/test/queues/config.ts @@ -102,7 +102,7 @@ export default buildConfigWithDefaults({ // Every second cron: '* * * * * *', limit: 100, - queue: 'autorunSecond', // name of the queue + queue: 'autorunSecond', }, // add as many cron jobs as you want ], @@ -123,9 +123,10 @@ export default buildConfigWithDefaults({ }, tasks: [ { - schedule: [{ cron: '* * * * * *', queue: 'schedules' }], + schedule: [{ cron: '* * * * * *', queue: 'autorunSecond' }], slug: 'EverySecond', handler: () => { + console.log('Running EverySecond task') return { output: {}, } From a5916de3c82d37aaa81a6f0c4934ebcc685c5e96 Mon Sep 17 00:00:00 2001 From: Alessio Gravili Date: Fri, 13 Jun 2025 11:22:00 -0700 Subject: [PATCH 03/18] scheduler property --- packages/payload/src/config/sanitize.ts | 66 ++++++++++++------- packages/payload/src/queues/config/index.ts | 2 +- .../payload/src/queues/config/types/index.ts | 12 ++++ .../payload/src/queues/restEndpointRun.ts | 4 +- test/queues/config.ts | 1 + 5 files changed, 58 insertions(+), 27 deletions(-) diff --git a/packages/payload/src/config/sanitize.ts b/packages/payload/src/config/sanitize.ts index 415620cdac4..f33ced94d0a 100644 --- a/packages/payload/src/config/sanitize.ts +++ b/packages/payload/src/config/sanitize.ts @@ -303,35 +303,51 @@ export const sanitizeConfig = async (incomingConfig: Config): Promise defaultAmount) { - // eslint-disable-next-line no-console - console.warn( - `The jobsCollectionOverrides function is returning a collection with an additional ${hook} hook defined. These hooks will not run unless the jobs.runHooks option is set to true. Setting this option to true will negatively impact performance.`, - ) - break - } + if (typeof configWithDefaults.jobs.jobsCollectionOverrides === 'function') { + defaultJobsCollection = configWithDefaults.jobs.jobsCollectionOverrides({ + defaultJobsCollection, + }) + + const hooks = defaultJobsCollection?.hooks + // @todo - delete this check in 4.0 + if (hooks && config?.jobs?.runHooks !== true) { + for (const hook of Object.keys(hooks)) { + const defaultAmount = hook === 'afterRead' || hook === 'beforeChange' ? 1 : 0 + if (hooks[hook as keyof typeof hooks]!.length > defaultAmount) { + // eslint-disable-next-line no-console + console.warn( + `The jobsCollectionOverrides function is returning a collection with an additional ${hook} hook defined. These hooks will not run unless the jobs.runHooks option is set to true. Setting this option to true will negatively impact performance.`, + ) + break } } } - const sanitizedJobsCollection = await sanitizeCollection( - config as unknown as Config, - defaultJobsCollection, - richTextSanitizationPromises, - validRelationships, - ) + } + const sanitizedJobsCollection = await sanitizeCollection( + config as unknown as Config, + defaultJobsCollection, + richTextSanitizationPromises, + validRelationships, + ) + + configWithDefaults.collections!.push(sanitizedJobsCollection) + + // Check for schedule property in both tasks and workflows + let hasScheduleProperty = + config?.jobs?.tasks?.length && config.jobs.tasks.some((task) => task.schedule) - configWithDefaults.collections!.push(sanitizedJobsCollection) + if ( + !hasScheduleProperty && + config?.jobs?.workflows?.length && + config.jobs.workflows.some((workflow) => workflow.schedule) + ) { + hasScheduleProperty = true + } + + if (!config.jobs?.scheduler && hasScheduleProperty) { + throw new InvalidConfiguration( + 'The jobs.scheduler property must be set when using scheduled tasks or workflows. Otherwise, the schedule property has no effect.', + ) } } diff --git a/packages/payload/src/queues/config/index.ts b/packages/payload/src/queues/config/index.ts index 55852db93d5..2c92c96553f 100644 --- a/packages/payload/src/queues/config/index.ts +++ b/packages/payload/src/queues/config/index.ts @@ -8,7 +8,7 @@ import { getJobTaskStatus } from '../utilities/getJobTaskStatus.js' export const jobsCollectionSlug = 'payload-jobs' -export const getDefaultJobsCollection: (config: Config) => CollectionConfig | null = (config) => { +export const getDefaultJobsCollection: (config: Config) => CollectionConfig = (config) => { const workflowSlugs: Set = new Set() const taskSlugs: Set = new Set(['inline']) diff --git a/packages/payload/src/queues/config/types/index.ts b/packages/payload/src/queues/config/types/index.ts index f80211e3fdd..dbd239065e2 100644 --- a/packages/payload/src/queues/config/types/index.ts +++ b/packages/payload/src/queues/config/types/index.ts @@ -111,6 +111,18 @@ export type JobsConfig = { * @default false */ runHooks?: boolean + /** + * Determines how task / workflow schedules should be handled. This + * property needs to be set in order for the `schedule` property to work. + * + * If set to `cron`, the job system will use cron jobs to run scheduled tasks. + * If set to `runEndpoint`, the job system will run scheduled tasks when calling the /api/payload-jobs/run endpoint. + * + * @remark On serverless platforms like Vercel, you should use `runEndpoint` to avoid issues with cron jobs not running as expected. + * You can then use Vercel Cron to call the `/api/payload-jobs/run` endpoint at specified intervals - this will handle both schedules + * and autorunning jobs. + */ + scheduler?: 'cron' | 'runEndpoint' /** * A function that will be executed before Payload picks up jobs which are configured by the `jobs.autorun` function. * If this function returns true, jobs will be queried and picked up. If it returns false, jobs will not be run. diff --git a/packages/payload/src/queues/restEndpointRun.ts b/packages/payload/src/queues/restEndpointRun.ts index d246c9e75f9..1114755d7cd 100644 --- a/packages/payload/src/queues/restEndpointRun.ts +++ b/packages/payload/src/queues/restEndpointRun.ts @@ -30,7 +30,9 @@ export const runJobsEndpoint: Endpoint = { queue?: string } - await handleSchedules() + if (req?.payload?.config?.jobs?.scheduler === 'runEndpoint') { + await handleSchedules() + } const runJobsArgs: RunJobsArgs = { queue, diff --git a/test/queues/config.ts b/test/queues/config.ts index 93820759a55..f281ba97f59 100644 --- a/test/queues/config.ts +++ b/test/queues/config.ts @@ -116,6 +116,7 @@ export default buildConfigWithDefaults({ }, } }, + scheduler: 'runEndpoint', processingOrder: { queues: { lifo: '-createdAt', From 35246cd4691652b0fa0922238b33abee86ebe580 Mon Sep 17 00:00:00 2001 From: Alessio Gravili Date: Sun, 15 Jun 2025 15:43:28 -0700 Subject: [PATCH 04/18] wip --- .../payload/src/queues/restEndpointRun.ts | 45 +++++++++++++++++-- 1 file changed, 41 insertions(+), 4 deletions(-) diff --git a/packages/payload/src/queues/restEndpointRun.ts b/packages/payload/src/queues/restEndpointRun.ts index 1114755d7cd..494ccd3e1b6 100644 --- a/packages/payload/src/queues/restEndpointRun.ts +++ b/packages/payload/src/queues/restEndpointRun.ts @@ -1,4 +1,5 @@ import type { Endpoint, SanitizedConfig } from '../config/types.js' +import type { PayloadRequest } from '../types/index.js' import { runJobs, type RunJobsArgs } from './operations/runJobs/index.js' @@ -24,14 +25,24 @@ export const runJobsEndpoint: Endpoint = { ) } - const { allQueues, limit, queue } = req.query as { + const { + allQueues, + handleSchedules: handleSchedulesParam, + limit, + queue, + } = req.query as { allQueues?: boolean + handleSchedules?: boolean limit?: number queue?: string } - if (req?.payload?.config?.jobs?.scheduler === 'runEndpoint') { - await handleSchedules() + const shouldHandleSchedules = + handleSchedulesParam && + !(typeof handleSchedulesParam === 'string' && handleSchedulesParam === 'false') + + if (req?.payload?.config?.jobs?.scheduler === 'runEndpoint' || shouldHandleSchedules) { + await handleSchedules({ req }) } const runJobsArgs: RunJobsArgs = { @@ -101,4 +112,30 @@ const configHasJobs = (config: SanitizedConfig): boolean => { * The benefit of doing it like this instead of a separate endpoint is that we can run jobs immediately * after they are scheduled */ -async function handleSchedules() {} +async function handleSchedules({ req }: { req: PayloadRequest }) { + const jobsConfig = req.payload.config.jobs + + const tasksWithSchedules = jobsConfig.tasks.filter((task) => { + return task.schedule?.length + }) + + const workflowsWithSchedules = jobsConfig.workflows.filter((workflow) => { + return workflow.schedule?.length + }) + + const allScheduleQueues = [ + ...tasksWithSchedules.flatMap( + (task) => task.schedule && task.schedule.map((schedule) => schedule.queue), + ), + ...workflowsWithSchedules.flatMap( + (workflow) => workflow.schedule && workflow.schedule.map((schedule) => schedule.queue), + ), + ] + + for (const queue of allScheduleQueues) { + const activeTasksForQueue = await req.payload.find({ + collection: 'payload-jobs', + where: {}, + }) + } +} From b105425768eba545ad306784353157e4128a91b7 Mon Sep 17 00:00:00 2001 From: Alessio Gravili Date: Tue, 17 Jun 2025 15:35:29 -0700 Subject: [PATCH 05/18] typescript --- packages/payload/src/queues/restEndpointRun.ts | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/packages/payload/src/queues/restEndpointRun.ts b/packages/payload/src/queues/restEndpointRun.ts index ee784c551a8..509ad3ac58a 100644 --- a/packages/payload/src/queues/restEndpointRun.ts +++ b/packages/payload/src/queues/restEndpointRun.ts @@ -117,13 +117,15 @@ const configHasJobs = (config: SanitizedConfig): boolean => { async function handleSchedules({ req }: { req: PayloadRequest }) { const jobsConfig = req.payload.config.jobs - const tasksWithSchedules = jobsConfig.tasks.filter((task) => { - return task.schedule?.length - }) - - const workflowsWithSchedules = jobsConfig.workflows.filter((workflow) => { - return workflow.schedule?.length - }) + const tasksWithSchedules = + jobsConfig.tasks?.filter((task) => { + return task.schedule?.length + }) ?? [] + + const workflowsWithSchedules = + jobsConfig.workflows?.filter((workflow) => { + return workflow.schedule?.length + }) ?? [] const allScheduleQueues = [ ...tasksWithSchedules.flatMap( From ae2f0d9338432fa822be8740ba5cde9ce23e110b Mon Sep 17 00:00:00 2001 From: Alessio Gravili Date: Wed, 18 Jun 2025 21:18:55 -0700 Subject: [PATCH 06/18] done with the function --- packages/payload/src/config/sanitize.ts | 23 ++- .../payload/src/database/defaultUpdateJobs.ts | 2 +- packages/payload/src/index.ts | 2 +- .../queues/config/{index.ts => collection.ts} | 0 packages/payload/src/queues/config/global.ts | 45 +++++ .../payload/src/queues/config/types/index.ts | 93 +++++++++- .../src/queues/config/types/taskTypes.ts | 3 + .../src/queues/config/types/workflowTypes.ts | 3 + packages/payload/src/queues/localAPI.ts | 2 +- .../handleSchedules/defaultAfterSchedule.ts | 46 +++++ .../handleSchedules/defaultBeforeSchedule.ts | 49 +++++ .../operations/handleSchedules/index.ts | 169 ++++++++++++++++++ .../src/queues/operations/runJobs/index.ts | 2 +- .../payload/src/queues/restEndpointRun.ts | 57 ++---- .../payload/src/queues/utilities/updateJob.ts | 2 +- .../versions/deleteScheduledPublishJobs.ts | 2 +- 16 files changed, 441 insertions(+), 59 deletions(-) rename packages/payload/src/queues/config/{index.ts => collection.ts} (100%) create mode 100644 packages/payload/src/queues/config/global.ts create mode 100644 packages/payload/src/queues/operations/handleSchedules/defaultAfterSchedule.ts create mode 100644 packages/payload/src/queues/operations/handleSchedules/defaultBeforeSchedule.ts create mode 100644 packages/payload/src/queues/operations/handleSchedules/index.ts diff --git a/packages/payload/src/config/sanitize.ts b/packages/payload/src/config/sanitize.ts index 739736d7841..4f9b64bbba7 100644 --- a/packages/payload/src/config/sanitize.ts +++ b/packages/payload/src/config/sanitize.ts @@ -33,7 +33,8 @@ import { } from '../locked-documents/config.js' import { getPreferencesCollection, preferencesCollectionSlug } from '../preferences/config.js' import { getQueryPresetsConfig, queryPresetsCollectionSlug } from '../query-presets/config.js' -import { getDefaultJobsCollection, jobsCollectionSlug } from '../queues/config/index.js' +import { getDefaultJobsCollection, jobsCollectionSlug } from '../queues/config/collection.js' +import { getJobStatsGlobal } from '../queues/config/global.js' import { flattenBlock } from '../utilities/flattenAllFields.js' import { getSchedulePublishTask } from '../versions/schedule/job.js' import { addDefaultsToConfig } from './defaults.js' @@ -335,7 +336,6 @@ export const sanitizeConfig = async (incomingConfig: Config): Promise task.schedule) - configWithDefaults.collections!.push(sanitizedJobsCollection) if ( !hasScheduleProperty && config?.jobs?.workflows?.length && @@ -344,10 +344,23 @@ export const sanitizeConfig = async (incomingConfig: Config): Promise GlobalConfig = (config) => { + return { + slug: jobStatsGlobalSlug, + fields: [ + { + name: 'stats', + type: 'json', + }, + ], + } +} diff --git a/packages/payload/src/queues/config/types/index.ts b/packages/payload/src/queues/config/types/index.ts index eac4649f7f9..1aaa4b8fa19 100644 --- a/packages/payload/src/queues/config/types/index.ts +++ b/packages/payload/src/queues/config/types/index.ts @@ -1,6 +1,7 @@ -import type { CollectionConfig } from '../../../index.js' +import type { CollectionConfig, Job } from '../../../index.js' import type { Payload, PayloadRequest, Sort } from '../../../types/index.js' import type { RunJobsArgs } from '../../operations/runJobs/index.js' +import type { JobStats } from '../global.js' import type { TaskConfig } from './taskTypes.js' import type { WorkflowConfig } from './workflowTypes.js' @@ -48,6 +49,11 @@ export type SanitizedJobsConfig = { * This property is automatically set during sanitization. */ enabled?: boolean + /** + * If set to `true`, a payload-job-stats global exists. + * This property is automatically set during sanitization. + */ + enabledStats?: boolean } & JobsConfig export type JobsConfig = { /** @@ -123,13 +129,14 @@ export type JobsConfig = { * property needs to be set in order for the `schedule` property to work. * * If set to `cron`, the job system will use cron jobs to run scheduled tasks. - * If set to `runEndpoint`, the job system will run scheduled tasks when calling the /api/payload-jobs/run endpoint. + * If set to `manual`, you are responsible for handling schedules yourself, for example by calling the `/api/payload-jobs/handleSchedules` endpoints + * + * @remark On serverless platforms like Vercel, you should use `manual` to avoid issues with cron jobs not running as expected. + * You can then use Vercel Cron to call the `/api/payload-jobs/handleSchedules` endpoint at specified intervals * - * @remark On serverless platforms like Vercel, you should use `runEndpoint` to avoid issues with cron jobs not running as expected. - * You can then use Vercel Cron to call the `/api/payload-jobs/run` endpoint at specified intervals - this will handle both schedules - * and autorunning jobs. + * @default 'manual' */ - scheduler?: 'cron' | 'runEndpoint' + scheduler?: 'cron' | 'manual' /** * A function that will be executed before Payload picks up jobs which are configured by the `jobs.autorun` function. * If this function returns true, jobs will be queried and picked up. If it returns false, jobs will not be run. @@ -147,6 +154,61 @@ export type JobsConfig = { workflows?: WorkflowConfig[] } +export type Queueable = { + scheduleConfig: ScheduleConfig + taskConfig?: TaskConfig + // If not set, queue it immediately + waitUntil?: Date + workflowConfig?: WorkflowConfig +} + +type OptionalPromise = Promise | T + +export type BeforeScheduleFn = (args: { + defaultBeforeSchedule: BeforeScheduleFn + /** + * payload-job-stats global data + */ + jobStats: JobStats + queueable: Queueable + req: PayloadRequest +}) => OptionalPromise<{ + input?: object + shouldSchedule: boolean + waitUntil?: Date +}> + +export type AfterScheduleFn = ( + args: { + defaultAfterSchedule: AfterScheduleFn + /** + * payload-job-stats global data + */ + jobStats: JobStats + queueable: Queueable + req: PayloadRequest + } & ( + | { + error: Error + job?: never + status: 'error' + } + | { + error?: never + job: Job + status: 'success' + } + | { + error?: never + job?: never + /** + * If the beforeSchedule hook returned `shouldSchedule: false`, this will be called with status `skipped`. + */ + status: 'skipped' + } + ), +) => OptionalPromise + export type ScheduleConfig = { /** * The cron for scheduling the job. @@ -168,6 +230,25 @@ export type ScheduleConfig = { * - '* * * * * *' every second */ cron: string + hooks?: { + /** + * Functions that will be executed after the job has been successfully scheduled. + * + * @default By default, global update?? Unless global update should happen before + */ + afterSchedule?: AfterScheduleFn + /** + * Functions that will be executed before the job is scheduled. + * You can use this to control whether or not the job should be scheduled, or what input + * data should be passed to the job. + * + * @default By default, this has one function that returns { shouldSchedule: true } if the following conditions are met: + * - There currently is no job of the same type in the specified queue that is currently running + * - There currently is no job of the same type in the specified queue that is scheduled to run in the future + * - There currently is no job of the same type in the specified queue that failed previously but can be retried + */ + beforeSchedule?: BeforeScheduleFn + } /** * Queue to which the scheduled job will be added. */ diff --git a/packages/payload/src/queues/config/types/taskTypes.ts b/packages/payload/src/queues/config/types/taskTypes.ts index 1b7fe822cf3..48edff68819 100644 --- a/packages/payload/src/queues/config/types/taskTypes.ts +++ b/packages/payload/src/queues/config/types/taskTypes.ts @@ -55,6 +55,9 @@ export type TaskHandler< args: TaskHandlerArgs, ) => Promise> | TaskHandlerResult +/** + * @todo rename to TaskSlug in 4.0, similar to CollectionSlug + */ export type TaskType = StringKeyOf // Extracts the type of `input` corresponding to each task diff --git a/packages/payload/src/queues/config/types/workflowTypes.ts b/packages/payload/src/queues/config/types/workflowTypes.ts index c967448499b..46575313de2 100644 --- a/packages/payload/src/queues/config/types/workflowTypes.ts +++ b/packages/payload/src/queues/config/types/workflowTypes.ts @@ -64,6 +64,9 @@ export type BaseJob< workflowSlug?: null | WorkflowTypes } +/** + * @todo rename to WorkflowSlug in 4.0, similar to CollectionSlug + */ export type WorkflowTypes = StringKeyOf /** diff --git a/packages/payload/src/queues/localAPI.ts b/packages/payload/src/queues/localAPI.ts index f1449c5664e..cb8ef5a81bd 100644 --- a/packages/payload/src/queues/localAPI.ts +++ b/packages/payload/src/queues/localAPI.ts @@ -9,7 +9,7 @@ import { type TypedJobs, type Where, } from '../index.js' -import { jobAfterRead, jobsCollectionSlug } from './config/index.js' +import { jobAfterRead, jobsCollectionSlug } from './config/collection.js' import { runJobs } from './operations/runJobs/index.js' import { updateJob, updateJobs } from './utilities/updateJob.js' diff --git a/packages/payload/src/queues/operations/handleSchedules/defaultAfterSchedule.ts b/packages/payload/src/queues/operations/handleSchedules/defaultAfterSchedule.ts new file mode 100644 index 00000000000..11fd1ba5caa --- /dev/null +++ b/packages/payload/src/queues/operations/handleSchedules/defaultAfterSchedule.ts @@ -0,0 +1,46 @@ +import type { AfterScheduleFn } from '../../config/types/index.js' + +import { type JobStats, jobStatsGlobalSlug } from '../../config/global.js' + +// JobStats['stats']['scheduledRuns]['queues'] but correct, handling the undefined cases +type JobStatsScheduledRuns = NonNullable< + NonNullable['scheduledRuns']>['queues'] +>[string] + +export const defaultAfterSchedule: AfterScheduleFn = async ({ jobStats, queueable, req }) => { + const existingQueuesConfig = + jobStats.stats?.scheduledRuns?.queues?.[queueable.scheduleConfig.queue] || {} + + const queueConfig: JobStatsScheduledRuns = { + ...existingQueuesConfig, + } + if (queueable.taskConfig) { + ;(queueConfig.tasks ??= {})[queueable.taskConfig.slug] = { + lastScheduledRun: new Date().toISOString(), + } + } else if (queueable.workflowConfig) { + ;(queueConfig.workflows ??= {})[queueable.workflowConfig.slug] = { + lastScheduledRun: new Date().toISOString(), + } + } + + // Add to payload-jobs-stats global regardless of the status + await req.payload.db.updateGlobal({ + slug: jobStatsGlobalSlug, + data: { + ...(jobStats.stats || {}), + stats: { + ...(jobStats.stats || {}), + scheduledRuns: { + ...(jobStats.stats?.scheduledRuns || {}), + queues: { + ...(jobStats.stats?.scheduledRuns?.queues || {}), + [queueable.scheduleConfig.queue]: queueConfig, + }, + }, + }, + } as JobStats, + req, + returning: false, + }) +} diff --git a/packages/payload/src/queues/operations/handleSchedules/defaultBeforeSchedule.ts b/packages/payload/src/queues/operations/handleSchedules/defaultBeforeSchedule.ts new file mode 100644 index 00000000000..b9fd0c6b661 --- /dev/null +++ b/packages/payload/src/queues/operations/handleSchedules/defaultBeforeSchedule.ts @@ -0,0 +1,49 @@ +import type { Where } from '../../../types/index.js' +import type { BeforeScheduleFn } from '../../config/types/index.js' + +export const defaultBeforeSchedule: BeforeScheduleFn = async ({ queueable, req }) => { + // All tasks in that queue that are either currently processing or can be run + const and: Where[] = [ + // TODO: Can we filter only jobs that have been created through the scheduling system? + { + queue: { + equals: queueable.scheduleConfig.queue, + }, + }, + + { + completedAt: { exists: false }, + }, + { + error: { exists: false }, + }, + ] + + if (queueable.taskConfig) { + and.push({ + taskSlug: { + equals: queueable.taskConfig.slug, + }, + }) + } else if (queueable.workflowConfig) { + and.push({ + workflowSlug: { + equals: queueable.workflowConfig.slug, + }, + }) + } + + const activeTasksForQueue = await req.payload.db.count({ + collection: 'payload-jobs', + req, + where: { + and, + }, + }) + + return { + input: {}, + shouldSchedule: activeTasksForQueue.totalDocs === 0, + waitUntil: queueable.waitUntil, + } +} diff --git a/packages/payload/src/queues/operations/handleSchedules/index.ts b/packages/payload/src/queues/operations/handleSchedules/index.ts new file mode 100644 index 00000000000..2d478945eab --- /dev/null +++ b/packages/payload/src/queues/operations/handleSchedules/index.ts @@ -0,0 +1,169 @@ +import { Cron } from 'croner' + +import type { Job } from '../../../index.js' +import type { PayloadRequest } from '../../../types/index.js' +import type { BeforeScheduleFn, Queueable, ScheduleConfig } from '../../config/types/index.js' +import type { TaskConfig } from '../../config/types/taskTypes.js' +import type { WorkflowConfig } from '../../config/types/workflowTypes.js' + +import { type JobStats, jobStatsGlobalSlug } from '../../config/global.js' +import { defaultAfterSchedule } from './defaultAfterSchedule.js' +import { defaultBeforeSchedule } from './defaultBeforeSchedule.js' + +/** + * On vercel, we cannot auto-schedule jobs using a Cron - instead, we'll use this same endpoint that can + * also be called from Vercel Cron for auto-running jobs. + * + * The benefit of doing it like this instead of a separate endpoint is that we can run jobs immediately + * after they are scheduled + */ +export async function handleSchedules({ req }: { req: PayloadRequest }) { + const jobsConfig = req.payload.config.jobs + + const tasksWithSchedules = + jobsConfig.tasks?.filter((task) => { + return task.schedule?.length + }) ?? [] + + const workflowsWithSchedules = + jobsConfig.workflows?.filter((workflow) => { + return workflow.schedule?.length + }) ?? [] + + const queuesWithSchedules: { + [queue: string]: { + schedules: { + scheduleConfig: ScheduleConfig + taskConfig?: TaskConfig + workflowConfig?: WorkflowConfig + }[] + } + } = {} + + for (const task of tasksWithSchedules) { + for (const schedule of task.schedule ?? []) { + ;(queuesWithSchedules[schedule.queue] ??= { schedules: [] }).schedules.push({ + scheduleConfig: schedule, + taskConfig: task, + }) + } + } + for (const workflow of workflowsWithSchedules) { + for (const schedule of workflow.schedule ?? []) { + ;(queuesWithSchedules[schedule.queue] ??= { schedules: [] }).schedules.push({ + scheduleConfig: schedule, + workflowConfig: workflow, + }) + } + } + + const stats: JobStats = await req.payload.db.findGlobal({ + slug: jobStatsGlobalSlug, + req, + }) + + /** + * Almost last step! Tasks and Workflows added here just need to be constraint-checkec (e.g max. 1 running task etc.), + * before we can queue them + */ + const workflowsToQueue: Queueable[] = [] + const tasksToQueue: Queueable[] = [] + + // Need to know when that particular job was last scheduled in that particular queue + + for (const [queueName, { schedules }] of Object.entries(queuesWithSchedules)) { + const queueScheduleStats = stats?.stats?.scheduledRuns?.queues?.[queueName] + + for (const schedulable of schedules) { + const lastScheduledRun = schedulable.taskConfig + ? queueScheduleStats?.tasks?.[schedulable.taskConfig.slug]?.lastScheduledRun + : queueScheduleStats?.workflows?.[schedulable.workflowConfig?.slug ?? '']?.lastScheduledRun + + const nextRun = new Cron(schedulable.scheduleConfig.cron).nextRun(lastScheduledRun ?? null) + + if (!nextRun) { + continue + } + + if (schedulable.taskConfig) { + tasksToQueue.push({ + scheduleConfig: schedulable.scheduleConfig, + taskConfig: schedulable.taskConfig, + waitUntil: nextRun, + }) + } else if (schedulable.workflowConfig) { + workflowsToQueue.push({ + scheduleConfig: schedulable.scheduleConfig, + waitUntil: nextRun, + workflowConfig: schedulable.workflowConfig, + }) + } + } + } + + /** + * Now queue, but check for constraints (= beforeSchedule) first. + * Default constraint (= defaultBeforeSchedule): max. 1 running / scheduled task or workflow per queue + */ + for (const queueable of [...tasksToQueue, ...workflowsToQueue]) { + if (!queueable.taskConfig && !queueable.workflowConfig) { + continue + } + + const beforeScheduleFn = queueable.scheduleConfig.hooks?.beforeSchedule + const afterScheduleFN = queueable.scheduleConfig.hooks?.afterSchedule + + try { + const beforeScheduleResult: Awaited> = await ( + beforeScheduleFn ?? defaultBeforeSchedule + )({ + // @ts-expect-error we know defaultBeforeSchedule will never call itself => pass null + defaultBeforeSchedule: beforeScheduleFn ? defaultBeforeSchedule : null, + jobStats: stats, + queueable, + req, + }) + + if (!beforeScheduleResult.shouldSchedule) { + await (afterScheduleFN ?? defaultAfterSchedule)({ + // @ts-expect-error we know defaultAfterchedule will never call itself => pass null + defaultAfterSchedule: afterScheduleFN ? defaultAfterSchedule : null, + jobStats: stats, + queueable, + req, + status: 'skipped', + }) + continue + } + + const job = (await req.payload.jobs.queue({ + input: beforeScheduleResult.input ?? {}, + queue: queueable.scheduleConfig.queue, + req, + task: queueable?.taskConfig?.slug, + waitUntil: beforeScheduleResult.waitUntil, + workflow: queueable.workflowConfig?.slug, + } as Parameters[0])) as unknown as Job + + await (afterScheduleFN ?? defaultAfterSchedule)({ + // @ts-expect-error we know defaultAfterchedule will never call itself => pass null + defaultAfterSchedule: afterScheduleFN ? defaultAfterSchedule : null, + job, + jobStats: stats, + queueable, + req, + status: 'success', + }) + } catch (error) { + await (afterScheduleFN ?? defaultAfterSchedule)({ + // @ts-expect-error we know defaultAfterchedule will never call itself => pass null + defaultAfterSchedule: afterScheduleFN ? defaultAfterSchedule : null, + error: error as Error, + jobStats: stats, + queueable, + req, + status: 'error', + }) + } + } +} diff --git a/packages/payload/src/queues/operations/runJobs/index.ts b/packages/payload/src/queues/operations/runJobs/index.ts index 99f6482e619..5c85fa73b13 100644 --- a/packages/payload/src/queues/operations/runJobs/index.ts +++ b/packages/payload/src/queues/operations/runJobs/index.ts @@ -6,7 +6,7 @@ import type { RunJobResult } from './runJob/index.js' import { Forbidden } from '../../../errors/Forbidden.js' import isolateObjectProperty from '../../../utilities/isolateObjectProperty.js' -import { jobsCollectionSlug } from '../../config/index.js' +import { jobsCollectionSlug } from '../../config/collection.js' import { JobCancelledError } from '../../errors/index.js' import { updateJob, updateJobs } from '../../utilities/updateJob.js' import { getUpdateJobFunction } from './runJob/getUpdateJobFunction.js' diff --git a/packages/payload/src/queues/restEndpointRun.ts b/packages/payload/src/queues/restEndpointRun.ts index 9eccb151c89..5b0967b5664 100644 --- a/packages/payload/src/queues/restEndpointRun.ts +++ b/packages/payload/src/queues/restEndpointRun.ts @@ -1,6 +1,9 @@ import type { Endpoint, SanitizedConfig } from '../config/types.js' import type { PayloadRequest } from '../types/index.js' +import type { SanitizedJobsConfig } from './config/types/index.js' +import { type JobStats, jobStatsGlobalSlug } from './config/global.js' +import { handleSchedules } from './operations/handleSchedules/index.js' import { runJobs, type RunJobsArgs } from './operations/runJobs/index.js' /** @@ -8,7 +11,9 @@ import { runJobs, type RunJobsArgs } from './operations/runJobs/index.js' */ export const runJobsEndpoint: Endpoint = { handler: async (req) => { - if (!configHasJobs(req.payload.config)) { + const jobsConfig = req.payload.config.jobs + + if (!configHasJobs(jobsConfig)) { return Response.json( { message: 'No jobs to run.', @@ -17,7 +22,7 @@ export const runJobsEndpoint: Endpoint = { ) } - const accessFn = req.payload.config.jobs?.access?.run ?? (() => true) + const accessFn = jobsConfig.access?.run ?? (() => true) const hasAccess = await accessFn({ req }) @@ -46,7 +51,12 @@ export const runJobsEndpoint: Endpoint = { handleSchedulesParam && !(typeof handleSchedulesParam === 'string' && handleSchedulesParam === 'false') - if (req?.payload?.config?.jobs?.scheduler === 'runEndpoint' || shouldHandleSchedules) { + if (shouldHandleSchedules) { + if (!jobsConfig.enabledStats) { + throw new Error( + 'The jobs stats global is not enabled, but is required to use the run endpoint with schedules.', + ) + } await handleSchedules({ req }) } @@ -106,43 +116,6 @@ export const runJobsEndpoint: Endpoint = { path: '/run', } -const configHasJobs = (config: SanitizedConfig): boolean => { - return Boolean(config.jobs?.tasks?.length || config.jobs?.workflows?.length) -} - -/** - * On vercel, we cannot auto-schedule jobs using a Cron - instead, we'll use this same endpoint that can - * also be called from Vercel Cron for auto-running jobs. - * - * The benefit of doing it like this instead of a separate endpoint is that we can run jobs immediately - * after they are scheduled - */ -async function handleSchedules({ req }: { req: PayloadRequest }) { - const jobsConfig = req.payload.config.jobs - - const tasksWithSchedules = - jobsConfig.tasks?.filter((task) => { - return task.schedule?.length - }) ?? [] - - const workflowsWithSchedules = - jobsConfig.workflows?.filter((workflow) => { - return workflow.schedule?.length - }) ?? [] - - const allScheduleQueues = [ - ...tasksWithSchedules.flatMap( - (task) => task.schedule && task.schedule.map((schedule) => schedule.queue), - ), - ...workflowsWithSchedules.flatMap( - (workflow) => workflow.schedule && workflow.schedule.map((schedule) => schedule.queue), - ), - ] - - for (const queue of allScheduleQueues) { - const activeTasksForQueue = await req.payload.find({ - collection: 'payload-jobs', - where: {}, - }) - } +const configHasJobs = (jobsConfig: SanitizedJobsConfig): boolean => { + return Boolean(jobsConfig.tasks?.length || jobsConfig.workflows?.length) } diff --git a/packages/payload/src/queues/utilities/updateJob.ts b/packages/payload/src/queues/utilities/updateJob.ts index 6ce4479eaad..a8a4ff69eee 100644 --- a/packages/payload/src/queues/utilities/updateJob.ts +++ b/packages/payload/src/queues/utilities/updateJob.ts @@ -3,7 +3,7 @@ import type { UpdateJobsArgs } from '../../database/types.js' import type { Job } from '../../index.js' import type { PayloadRequest, Sort, Where } from '../../types/index.js' -import { jobAfterRead, jobsCollectionSlug } from '../config/index.js' +import { jobAfterRead, jobsCollectionSlug } from '../config/collection.js' type BaseArgs = { data: Partial diff --git a/packages/payload/src/versions/deleteScheduledPublishJobs.ts b/packages/payload/src/versions/deleteScheduledPublishJobs.ts index 4020ad4fd6f..6ce4199f8c2 100644 --- a/packages/payload/src/versions/deleteScheduledPublishJobs.ts +++ b/packages/payload/src/versions/deleteScheduledPublishJobs.ts @@ -1,7 +1,7 @@ import type { PayloadRequest } from '../types/index.js' import { type Payload } from '../index.js' -import { jobsCollectionSlug } from '../queues/config/index.js' +import { jobsCollectionSlug } from '../queues/config/collection.js' type Args = { id?: number | string From 318bc04122ac602973e161ca21ab8051bd6c32d5 Mon Sep 17 00:00:00 2001 From: Alessio Gravili Date: Wed, 18 Jun 2025 21:20:57 -0700 Subject: [PATCH 07/18] add local api --- packages/payload/src/queues/localAPI.ts | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/packages/payload/src/queues/localAPI.ts b/packages/payload/src/queues/localAPI.ts index cb8ef5a81bd..9b777f4b75a 100644 --- a/packages/payload/src/queues/localAPI.ts +++ b/packages/payload/src/queues/localAPI.ts @@ -10,10 +10,18 @@ import { type Where, } from '../index.js' import { jobAfterRead, jobsCollectionSlug } from './config/collection.js' +import { handleSchedules } from './operations/handleSchedules/index.js' import { runJobs } from './operations/runJobs/index.js' import { updateJob, updateJobs } from './utilities/updateJob.js' export const getJobsLocalAPI = (payload: Payload) => ({ + handleSchedules: async (args?: { req?: PayloadRequest }): Promise => { + const newReq: PayloadRequest = args?.req ?? (await createLocalReq({}, payload)) + + await handleSchedules({ + req: newReq, + }) + }, queue: async < // eslint-disable-next-line @typescript-eslint/no-duplicate-type-constituents TTaskOrWorkflowSlug extends keyof TypedJobs['tasks'] | keyof TypedJobs['workflows'], From 3556edfefe2e941230929536a289fe159e879a5b Mon Sep 17 00:00:00 2001 From: Alessio Gravili Date: Wed, 18 Jun 2025 21:24:43 -0700 Subject: [PATCH 08/18] handleSchedules endpoint --- .../payload/src/queues/config/collection.ts | 5 +- .../src/queues/endpoints/handleSchedules.ts | 56 +++++++++++++++++++ .../{restEndpointRun.ts => endpoints/run.ts} | 12 ++-- 3 files changed, 64 insertions(+), 9 deletions(-) create mode 100644 packages/payload/src/queues/endpoints/handleSchedules.ts rename packages/payload/src/queues/{restEndpointRun.ts => endpoints/run.ts} (84%) diff --git a/packages/payload/src/queues/config/collection.ts b/packages/payload/src/queues/config/collection.ts index 9628a29bfff..2d22d318c71 100644 --- a/packages/payload/src/queues/config/collection.ts +++ b/packages/payload/src/queues/config/collection.ts @@ -3,7 +3,8 @@ import type { Config, SanitizedConfig } from '../../config/types.js' import type { Field } from '../../fields/config/types.js' import type { Job } from '../../index.js' -import { runJobsEndpoint } from '../restEndpointRun.js' +import { handleSchedulesJobsEndpoint } from '../endpoints/handleSchedules.js' +import { runJobsEndpoint } from '../endpoints/run.js' import { getJobTaskStatus } from '../utilities/getJobTaskStatus.js' export const jobsCollectionSlug = 'payload-jobs' @@ -102,7 +103,7 @@ export const getDefaultJobsCollection: (config: Config) => CollectionConfig = (c group: 'System', hidden: true, }, - endpoints: [runJobsEndpoint], + endpoints: [runJobsEndpoint, handleSchedulesJobsEndpoint], fields: [ { name: 'input', diff --git a/packages/payload/src/queues/endpoints/handleSchedules.ts b/packages/payload/src/queues/endpoints/handleSchedules.ts new file mode 100644 index 00000000000..d3147eb8f98 --- /dev/null +++ b/packages/payload/src/queues/endpoints/handleSchedules.ts @@ -0,0 +1,56 @@ +import type { Endpoint } from '../../config/types.js' + +import { handleSchedules } from '../operations/handleSchedules/index.js' +import { configHasJobs } from './run.js' + +/** + * /api/payload-jobs/handleSchedules endpoint + */ +export const handleSchedulesJobsEndpoint: Endpoint = { + handler: async (req) => { + const jobsConfig = req.payload.config.jobs + + if (!configHasJobs(jobsConfig)) { + return Response.json( + { + message: 'No jobs to schedule.', + }, + { status: 200 }, + ) + } + + const accessFn = jobsConfig.access?.run ?? (() => true) + + const hasAccess = await accessFn({ req }) + + if (!hasAccess) { + return Response.json( + { + message: req.i18n.t('error:unauthorized'), + }, + { status: 401 }, + ) + } + + if (!jobsConfig.enabledStats) { + return Response.json( + { + message: + 'The jobs stats global is not enabled, but is required to use the run endpoint with schedules.', + }, + { status: 500 }, + ) + } + + await handleSchedules({ req }) + + return Response.json( + { + message: req.i18n.t('general:success'), + }, + { status: 200 }, + ) + }, + method: 'get', + path: '/handleSchedules', +} diff --git a/packages/payload/src/queues/restEndpointRun.ts b/packages/payload/src/queues/endpoints/run.ts similarity index 84% rename from packages/payload/src/queues/restEndpointRun.ts rename to packages/payload/src/queues/endpoints/run.ts index 5b0967b5664..6f22bad1e8c 100644 --- a/packages/payload/src/queues/restEndpointRun.ts +++ b/packages/payload/src/queues/endpoints/run.ts @@ -1,10 +1,8 @@ -import type { Endpoint, SanitizedConfig } from '../config/types.js' -import type { PayloadRequest } from '../types/index.js' -import type { SanitizedJobsConfig } from './config/types/index.js' +import type { Endpoint } from '../../config/types.js' +import type { SanitizedJobsConfig } from '../config/types/index.js' -import { type JobStats, jobStatsGlobalSlug } from './config/global.js' -import { handleSchedules } from './operations/handleSchedules/index.js' -import { runJobs, type RunJobsArgs } from './operations/runJobs/index.js' +import { handleSchedules } from '../operations/handleSchedules/index.js' +import { runJobs, type RunJobsArgs } from '../operations/runJobs/index.js' /** * /api/payload-jobs/run endpoint @@ -116,6 +114,6 @@ export const runJobsEndpoint: Endpoint = { path: '/run', } -const configHasJobs = (jobsConfig: SanitizedJobsConfig): boolean => { +export const configHasJobs = (jobsConfig: SanitizedJobsConfig): boolean => { return Boolean(jobsConfig.tasks?.length || jobsConfig.workflows?.length) } From 61725cb40aa7a600686b660c7028ba1ab1b8c4da Mon Sep 17 00:00:00 2001 From: Alessio Gravili Date: Wed, 18 Jun 2025 21:45:42 -0700 Subject: [PATCH 09/18] bump croner --- packages/payload/package.json | 2 +- pnpm-lock.yaml | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/packages/payload/package.json b/packages/payload/package.json index d26f960a84d..7fde736779f 100644 --- a/packages/payload/package.json +++ b/packages/payload/package.json @@ -92,7 +92,7 @@ "busboy": "^1.6.0", "ci-info": "^4.1.0", "console-table-printer": "2.12.1", - "croner": "9.0.0", + "croner": "9.1.0", "dataloader": "2.2.3", "deepmerge": "4.3.1", "file-type": "19.3.0", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 4be930216d7..c2e2100fea7 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -826,8 +826,8 @@ importers: specifier: 2.12.1 version: 2.12.1 croner: - specifier: 9.0.0 - version: 9.0.0 + specifier: 9.1.0 + version: 9.1.0 dataloader: specifier: 2.2.3 version: 2.2.3 @@ -6354,8 +6354,8 @@ packages: engines: {node: ^14.15.0 || ^16.10.0 || >=18.0.0} hasBin: true - croner@9.0.0: - resolution: {integrity: sha512-onMB0OkDjkXunhdW9htFjEhqrD54+M94i6ackoUkjHKbRnXdyEyKRelp4nJ1kAz32+s27jP1FsebpJCVl0BsvA==} + croner@9.1.0: + resolution: {integrity: sha512-p9nwwR4qyT5W996vBZhdvBCnMhicY5ytZkR4D1Xj0wuTDEiMnjwR57Q3RXYY/s0EpX6Ay3vgIcfaR+ewGHsi+g==} engines: {node: '>=18.0'} cross-env@7.0.3: @@ -16158,7 +16158,7 @@ snapshots: - supports-color - ts-node - croner@9.0.0: {} + croner@9.1.0: {} cross-env@7.0.3: dependencies: From caca0443e2ff2ca188c0748f69cdd8215bd6cc55 Mon Sep 17 00:00:00 2001 From: Alessio Gravili Date: Wed, 18 Jun 2025 21:46:32 -0700 Subject: [PATCH 10/18] improve example --- test/queues/config.ts | 40 ++++++++++++++++++++++++++++++++++------ 1 file changed, 34 insertions(+), 6 deletions(-) diff --git a/test/queues/config.ts b/test/queues/config.ts index d72ffdd6a54..bf74b1e45c1 100644 --- a/test/queues/config.ts +++ b/test/queues/config.ts @@ -1,4 +1,3 @@ -/* eslint-disable no-restricted-exports */ import type { TaskConfig } from 'payload' import { lexicalEditor } from '@payloadcms/richtext-lexical' @@ -32,7 +31,6 @@ import { workflowRetries2TasksRetriesUndefinedWorkflow } from './workflows/workf const filename = fileURLToPath(import.meta.url) const dirname = path.dirname(filename) -// eslint-disable-next-line no-restricted-exports export default buildConfigWithDefaults({ collections: [ { @@ -118,7 +116,7 @@ export default buildConfigWithDefaults({ }, } }, - scheduler: 'runEndpoint', + scheduler: 'manual', processingOrder: { queues: { lifo: '-createdAt', @@ -126,10 +124,40 @@ export default buildConfigWithDefaults({ }, tasks: [ { - schedule: [{ cron: '* * * * * *', queue: 'autorunSecond' }], + schedule: [ + { + cron: '* * * * * *', + queue: 'autorunSecond', + hooks: { + beforeSchedule: async (args) => { + const result = await args.defaultBeforeSchedule(args) // Handles verifying that there are no jobs already scheduled or processing + return { + ...result, + input: { + message: 'This task runs every second', + }, + } + }, + afterSchedule: async (args) => { + await args.defaultAfterSchedule(args) // Handles updating the payload-jobs-stats global + args.req.payload.logger.info( + 'EverySecond task scheduled:', + args.status === 'success' ? args.job.id : 'failed to schedule', + ) + }, + }, + }, + ], slug: 'EverySecond', - handler: () => { - console.log('Running EverySecond task') + inputSchema: [ + { + name: 'message', + type: 'text', + required: true, + }, + ], + handler: ({ input, req }) => { + req.payload.logger.info(input.message) return { output: {}, } From f38b5803fa6728191bfaae24796618e2bf8af59a Mon Sep 17 00:00:00 2001 From: Alessio Gravili Date: Wed, 18 Jun 2025 21:49:48 -0700 Subject: [PATCH 11/18] fix incorrect croner usage --- packages/payload/src/index.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/payload/src/index.ts b/packages/payload/src/index.ts index 0b4a44a9a67..0676c832c0e 100644 --- a/packages/payload/src/index.ts +++ b/packages/payload/src/index.ts @@ -811,8 +811,7 @@ export class BasePayload { if (!shouldAutoRun) { job.stop() - - return false + return } } From 0910627b4d41eeafa2610fa765aac801d4a5d8fb Mon Sep 17 00:00:00 2001 From: Alessio Gravili Date: Wed, 18 Jun 2025 22:56:59 -0700 Subject: [PATCH 12/18] fix: mongodb transform can error if db.updateGlobal is called without calling db.createGlobal --- packages/db-mongodb/src/utilities/transform.ts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/packages/db-mongodb/src/utilities/transform.ts b/packages/db-mongodb/src/utilities/transform.ts index e5633f1acf9..ebb384d15df 100644 --- a/packages/db-mongodb/src/utilities/transform.ts +++ b/packages/db-mongodb/src/utilities/transform.ts @@ -374,6 +374,10 @@ export const transform = ({ parentIsLocalized = false, validateRelationships = true, }: Args) => { + if (!data) { + return null + } + if (Array.isArray(data)) { for (const item of data) { transform({ adapter, data: item, fields, globalSlug, operation, validateRelationships }) From 3a1c6e9110c843360cbf5e0b31479a2f3fb6d7af Mon Sep 17 00:00:00 2001 From: Alessio Gravili Date: Wed, 18 Jun 2025 22:57:11 -0700 Subject: [PATCH 13/18] clarifies the jsdocs for that --- packages/payload/src/database/types.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/packages/payload/src/database/types.ts b/packages/payload/src/database/types.ts index ca94f769974..d3795584c3f 100644 --- a/packages/payload/src/database/types.ts +++ b/packages/payload/src/database/types.ts @@ -142,6 +142,9 @@ export interface BaseDatabaseAdapter { } } + /** + * Updates a global that exists. If the global doesn't exist yet, this will not work - you should use `createGlobal` instead. + */ updateGlobal: UpdateGlobal updateGlobalVersion: UpdateGlobalVersion From 9602a7074e2e7bda5ba8ec4374e0df01f4870786 Mon Sep 17 00:00:00 2001 From: Alessio Gravili Date: Wed, 18 Jun 2025 22:57:44 -0700 Subject: [PATCH 14/18] make it work --- .../payload/src/queues/config/types/index.ts | 4 +- .../handleSchedules/defaultAfterSchedule.ts | 53 ++++++++++++------- .../operations/handleSchedules/index.ts | 4 +- test/queues/config.ts | 4 +- test/queues/payload-types.ts | 49 ++++++++++++++++- 5 files changed, 89 insertions(+), 25 deletions(-) diff --git a/packages/payload/src/queues/config/types/index.ts b/packages/payload/src/queues/config/types/index.ts index 1aaa4b8fa19..397743df510 100644 --- a/packages/payload/src/queues/config/types/index.ts +++ b/packages/payload/src/queues/config/types/index.ts @@ -182,9 +182,9 @@ export type AfterScheduleFn = ( args: { defaultAfterSchedule: AfterScheduleFn /** - * payload-job-stats global data + * payload-job-stats global data. If the global does not exist, it will be null. */ - jobStats: JobStats + jobStats: JobStats | null queueable: Queueable req: PayloadRequest } & ( diff --git a/packages/payload/src/queues/operations/handleSchedules/defaultAfterSchedule.ts b/packages/payload/src/queues/operations/handleSchedules/defaultAfterSchedule.ts index 11fd1ba5caa..db9eb56660e 100644 --- a/packages/payload/src/queues/operations/handleSchedules/defaultAfterSchedule.ts +++ b/packages/payload/src/queues/operations/handleSchedules/defaultAfterSchedule.ts @@ -2,14 +2,13 @@ import type { AfterScheduleFn } from '../../config/types/index.js' import { type JobStats, jobStatsGlobalSlug } from '../../config/global.js' -// JobStats['stats']['scheduledRuns]['queues'] but correct, handling the undefined cases type JobStatsScheduledRuns = NonNullable< NonNullable['scheduledRuns']>['queues'] >[string] export const defaultAfterSchedule: AfterScheduleFn = async ({ jobStats, queueable, req }) => { const existingQueuesConfig = - jobStats.stats?.scheduledRuns?.queues?.[queueable.scheduleConfig.queue] || {} + jobStats?.stats?.scheduledRuns?.queues?.[queueable.scheduleConfig.queue] || {} const queueConfig: JobStatsScheduledRuns = { ...existingQueuesConfig, @@ -25,22 +24,40 @@ export const defaultAfterSchedule: AfterScheduleFn = async ({ jobStats, queueabl } // Add to payload-jobs-stats global regardless of the status - await req.payload.db.updateGlobal({ - slug: jobStatsGlobalSlug, - data: { - ...(jobStats.stats || {}), - stats: { - ...(jobStats.stats || {}), - scheduledRuns: { - ...(jobStats.stats?.scheduledRuns || {}), - queues: { - ...(jobStats.stats?.scheduledRuns?.queues || {}), - [queueable.scheduleConfig.queue]: queueConfig, + if (jobStats) { + await req.payload.db.updateGlobal({ + slug: jobStatsGlobalSlug, + data: { + ...(jobStats || {}), + stats: { + ...(jobStats?.stats || {}), + scheduledRuns: { + ...(jobStats?.stats?.scheduledRuns || {}), + queues: { + ...(jobStats?.stats?.scheduledRuns?.queues || {}), + [queueable.scheduleConfig.queue]: queueConfig, + }, }, }, - }, - } as JobStats, - req, - returning: false, - }) + } as JobStats, + req, + returning: false, + }) + } else { + await req.payload.db.createGlobal({ + slug: jobStatsGlobalSlug, + data: { + createdAt: new Date().toISOString(), + stats: { + scheduledRuns: { + queues: { + [queueable.scheduleConfig.queue]: queueConfig, + }, + }, + }, + } as JobStats, + req, + returning: false, + }) + } } diff --git a/packages/payload/src/queues/operations/handleSchedules/index.ts b/packages/payload/src/queues/operations/handleSchedules/index.ts index 2d478945eab..63f5ae5c01a 100644 --- a/packages/payload/src/queues/operations/handleSchedules/index.ts +++ b/packages/payload/src/queues/operations/handleSchedules/index.ts @@ -79,7 +79,9 @@ export async function handleSchedules({ req }: { req: PayloadRequest }) { ? queueScheduleStats?.tasks?.[schedulable.taskConfig.slug]?.lastScheduledRun : queueScheduleStats?.workflows?.[schedulable.workflowConfig?.slug ?? '']?.lastScheduledRun - const nextRun = new Cron(schedulable.scheduleConfig.cron).nextRun(lastScheduledRun ?? null) + const nextRun = new Cron(schedulable.scheduleConfig.cron).nextRun( + lastScheduledRun ?? undefined, + ) if (!nextRun) { continue diff --git a/test/queues/config.ts b/test/queues/config.ts index bf74b1e45c1..259d28b1982 100644 --- a/test/queues/config.ts +++ b/test/queues/config.ts @@ -141,8 +141,8 @@ export default buildConfigWithDefaults({ afterSchedule: async (args) => { await args.defaultAfterSchedule(args) // Handles updating the payload-jobs-stats global args.req.payload.logger.info( - 'EverySecond task scheduled:', - args.status === 'success' ? args.job.id : 'failed to schedule', + 'EverySecond task scheduled: ' + + (args.status === 'success' ? String(args.job.id) : 'failed to schedule'), ) }, }, diff --git a/test/queues/payload-types.ts b/test/queues/payload-types.ts index 54945be89cf..b57cf325b09 100644 --- a/test/queues/payload-types.ts +++ b/test/queues/payload-types.ts @@ -88,14 +88,19 @@ export interface Config { db: { defaultIDType: string; }; - globals: {}; - globalsSelect: {}; + globals: { + 'payload-jobs-stats': PayloadJobsStat; + }; + globalsSelect: { + 'payload-jobs-stats': PayloadJobsStatsSelect | PayloadJobsStatsSelect; + }; locale: null; user: User & { collection: 'users'; }; jobs: { tasks: { + EverySecond: TaskEverySecond; UpdatePost: MyUpdatePostType; UpdatePostStep2: TaskUpdatePostStep2; CreateSimple: TaskCreateSimple; @@ -260,6 +265,7 @@ export interface PayloadJob { completedAt: string; taskSlug: | 'inline' + | 'EverySecond' | 'UpdatePost' | 'UpdatePostStep2' | 'CreateSimple' @@ -328,6 +334,7 @@ export interface PayloadJob { taskSlug?: | ( | 'inline' + | 'EverySecond' | 'UpdatePost' | 'UpdatePostStep2' | 'CreateSimple' @@ -511,6 +518,44 @@ export interface PayloadMigrationsSelect { updatedAt?: T; createdAt?: T; } +/** + * This interface was referenced by `Config`'s JSON-Schema + * via the `definition` "payload-jobs-stats". + */ +export interface PayloadJobsStat { + id: string; + stats?: + | { + [k: string]: unknown; + } + | unknown[] + | string + | number + | boolean + | null; + updatedAt?: string | null; + createdAt?: string | null; +} +/** + * This interface was referenced by `Config`'s JSON-Schema + * via the `definition` "payload-jobs-stats_select". + */ +export interface PayloadJobsStatsSelect { + stats?: T; + updatedAt?: T; + createdAt?: T; + globalType?: T; +} +/** + * This interface was referenced by `Config`'s JSON-Schema + * via the `definition` "TaskEverySecond". + */ +export interface TaskEverySecond { + input: { + message: string; + }; + output?: unknown; +} /** * This interface was referenced by `Config`'s JSON-Schema * via the `definition` "MyUpdatePostType". From bbd914ae877a92dca635f234277f0fb0c6a4f392 Mon Sep 17 00:00:00 2001 From: Alessio Gravili Date: Wed, 18 Jun 2025 23:02:16 -0700 Subject: [PATCH 15/18] better ecxample logging, improve waitUntil field admin appearance --- packages/payload/src/queues/config/collection.ts | 3 +++ test/queues/config.ts | 6 +++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/packages/payload/src/queues/config/collection.ts b/packages/payload/src/queues/config/collection.ts index 2d22d318c71..794dc870220 100644 --- a/packages/payload/src/queues/config/collection.ts +++ b/packages/payload/src/queues/config/collection.ts @@ -199,6 +199,9 @@ export const getDefaultJobsCollection: (config: Config) => CollectionConfig = (c { name: 'waitUntil', type: 'date', + admin: { + date: { pickerAppearance: 'dayAndTime' }, + }, index: true, }, { diff --git a/test/queues/config.ts b/test/queues/config.ts index 259d28b1982..8b7c00b7b64 100644 --- a/test/queues/config.ts +++ b/test/queues/config.ts @@ -142,7 +142,11 @@ export default buildConfigWithDefaults({ await args.defaultAfterSchedule(args) // Handles updating the payload-jobs-stats global args.req.payload.logger.info( 'EverySecond task scheduled: ' + - (args.status === 'success' ? String(args.job.id) : 'failed to schedule'), + (args.status === 'success' + ? String(args.job.id) + : args.status === 'skipped' + ? 'skipped' + : 'error'), ) }, }, From 33092e19712858474d8514d7a263ee1edad79a14 Mon Sep 17 00:00:00 2001 From: Alessio Gravili Date: Thu, 19 Jun 2025 13:20:12 -0700 Subject: [PATCH 16/18] improve return types --- .../src/queues/endpoints/handleSchedules.ts | 5 +++- packages/payload/src/queues/localAPI.ts | 6 ++--- .../operations/handleSchedules/index.ts | 24 ++++++++++++++++++- 3 files changed, 30 insertions(+), 5 deletions(-) diff --git a/packages/payload/src/queues/endpoints/handleSchedules.ts b/packages/payload/src/queues/endpoints/handleSchedules.ts index d3147eb8f98..6da37daf0f1 100644 --- a/packages/payload/src/queues/endpoints/handleSchedules.ts +++ b/packages/payload/src/queues/endpoints/handleSchedules.ts @@ -42,11 +42,14 @@ export const handleSchedulesJobsEndpoint: Endpoint = { ) } - await handleSchedules({ req }) + const { errored, queued, skipped } = await handleSchedules({ req }) return Response.json( { + errored, message: req.i18n.t('general:success'), + queued, + skipped, }, { status: 200 }, ) diff --git a/packages/payload/src/queues/localAPI.ts b/packages/payload/src/queues/localAPI.ts index 9b777f4b75a..a67dc8a5cb6 100644 --- a/packages/payload/src/queues/localAPI.ts +++ b/packages/payload/src/queues/localAPI.ts @@ -10,15 +10,15 @@ import { type Where, } from '../index.js' import { jobAfterRead, jobsCollectionSlug } from './config/collection.js' -import { handleSchedules } from './operations/handleSchedules/index.js' +import { handleSchedules, type HandleSchedulesResult } from './operations/handleSchedules/index.js' import { runJobs } from './operations/runJobs/index.js' import { updateJob, updateJobs } from './utilities/updateJob.js' export const getJobsLocalAPI = (payload: Payload) => ({ - handleSchedules: async (args?: { req?: PayloadRequest }): Promise => { + handleSchedules: async (args?: { req?: PayloadRequest }): Promise => { const newReq: PayloadRequest = args?.req ?? (await createLocalReq({}, payload)) - await handleSchedules({ + return await handleSchedules({ req: newReq, }) }, diff --git a/packages/payload/src/queues/operations/handleSchedules/index.ts b/packages/payload/src/queues/operations/handleSchedules/index.ts index 63f5ae5c01a..fa61fdaf5ad 100644 --- a/packages/payload/src/queues/operations/handleSchedules/index.ts +++ b/packages/payload/src/queues/operations/handleSchedules/index.ts @@ -10,6 +10,12 @@ import { type JobStats, jobStatsGlobalSlug } from '../../config/global.js' import { defaultAfterSchedule } from './defaultAfterSchedule.js' import { defaultBeforeSchedule } from './defaultBeforeSchedule.js' +export type HandleSchedulesResult = { + errored: Queueable[] + queued: Queueable[] + skipped: Queueable[] +} + /** * On vercel, we cannot auto-schedule jobs using a Cron - instead, we'll use this same endpoint that can * also be called from Vercel Cron for auto-running jobs. @@ -17,7 +23,11 @@ import { defaultBeforeSchedule } from './defaultBeforeSchedule.js' * The benefit of doing it like this instead of a separate endpoint is that we can run jobs immediately * after they are scheduled */ -export async function handleSchedules({ req }: { req: PayloadRequest }) { +export async function handleSchedules({ + req, +}: { + req: PayloadRequest +}): Promise { const jobsConfig = req.payload.config.jobs const tasksWithSchedules = @@ -103,6 +113,10 @@ export async function handleSchedules({ req }: { req: PayloadRequest }) { } } + const queued: Queueable[] = [] + const skipped: Queueable[] = [] + const errored: Queueable[] = [] + /** * Now queue, but check for constraints (= beforeSchedule) first. * Default constraint (= defaultBeforeSchedule): max. 1 running / scheduled task or workflow per queue @@ -135,6 +149,7 @@ export async function handleSchedules({ req }: { req: PayloadRequest }) { req, status: 'skipped', }) + skipped.push(queueable) continue } @@ -156,6 +171,7 @@ export async function handleSchedules({ req }: { req: PayloadRequest }) { req, status: 'success', }) + queued.push(queueable) } catch (error) { await (afterScheduleFN ?? defaultAfterSchedule)({ // @ts-expect-error we know defaultAfterchedule will never call itself => pass null @@ -166,6 +182,12 @@ export async function handleSchedules({ req }: { req: PayloadRequest }) { req, status: 'error', }) + errored.push(queueable) } } + return { + errored, + queued, + skipped, + } } From 0c90da80db6b10479a9b25f56c38e41e9b2ac782 Mon Sep 17 00:00:00 2001 From: Alessio Gravili Date: Thu, 19 Jun 2025 13:28:20 -0700 Subject: [PATCH 17/18] fix: do not shut down jobs on HMR, as we never start them up again afterwards --- packages/payload/src/index.ts | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/packages/payload/src/index.ts b/packages/payload/src/index.ts index 0676c832c0e..c78e20227ef 100644 --- a/packages/payload/src/index.ts +++ b/packages/payload/src/index.ts @@ -805,12 +805,12 @@ export class BasePayload { await Promise.all( cronJobs.map((cronConfig) => { - const job = new Cron(cronConfig.cron ?? DEFAULT_CRON, async () => { + const jobAutorunCron = new Cron(cronConfig.cron ?? DEFAULT_CRON, async () => { if (typeof this.config.jobs.shouldAutoRun === 'function') { const shouldAutoRun = await this.config.jobs.shouldAutoRun(this) if (!shouldAutoRun) { - job.stop() + jobAutorunCron.stop() return } } @@ -821,7 +821,7 @@ export class BasePayload { }) }) - this.crons.push(job) + this.crons.push(jobAutorunCron) }), ) } @@ -871,8 +871,10 @@ export const reload = async ( payload: Payload, skipImportMapGeneration?: boolean, ): Promise => { - await payload.destroy() - + if (typeof payload.db.destroy === 'function') { + // Only destroy db, as we then later only call payload.db.init and not payload.init + await payload.db.destroy() + } payload.config = config payload.collections = config.collections.reduce( From e61ebab0eff96589213f0327c647493d8201163f Mon Sep 17 00:00:00 2001 From: Alessio Gravili Date: Thu, 19 Jun 2025 13:35:06 -0700 Subject: [PATCH 18/18] feat: conditionally add meta field to jobs collection --- packages/payload/src/config/sanitize.ts | 62 +++++++++---------- .../payload/src/queues/config/collection.ts | 23 ++++--- test/queues/payload-types.ts | 10 +++ 3 files changed, 57 insertions(+), 38 deletions(-) diff --git a/packages/payload/src/config/sanitize.ts b/packages/payload/src/config/sanitize.ts index 4f9b64bbba7..0d0b0938273 100644 --- a/packages/payload/src/config/sanitize.ts +++ b/packages/payload/src/config/sanitize.ts @@ -301,37 +301,6 @@ export const sanitizeConfig = async (incomingConfig: Config): Promise defaultAmount) { - // eslint-disable-next-line no-console - console.warn( - `The jobsCollectionOverrides function is returning a collection with an additional ${hookKey} hook defined. These hooks will not run unless the jobs.runHooks option is set to true. Setting this option to true will negatively impact performance.`, - ) - break - } - } - } - } - const sanitizedJobsCollection = await sanitizeCollection( - config as unknown as Config, - defaultJobsCollection, - richTextSanitizationPromises, - validRelationships, - ) - - ;(config.collections ??= []).push(sanitizedJobsCollection) - // Check for schedule property in both tasks and workflows let hasScheduleProperty = config?.jobs?.tasks?.length && config.jobs.tasks.some((task) => task.schedule) @@ -362,6 +331,37 @@ export const sanitizeConfig = async (incomingConfig: Config): Promise defaultAmount) { + // eslint-disable-next-line no-console + console.warn( + `The jobsCollectionOverrides function is returning a collection with an additional ${hookKey} hook defined. These hooks will not run unless the jobs.runHooks option is set to true. Setting this option to true will negatively impact performance.`, + ) + break + } + } + } + } + const sanitizedJobsCollection = await sanitizeCollection( + config as unknown as Config, + defaultJobsCollection, + richTextSanitizationPromises, + validRelationships, + ) + + ;(config.collections ??= []).push(sanitizedJobsCollection) } configWithDefaults.collections!.push( diff --git a/packages/payload/src/queues/config/collection.ts b/packages/payload/src/queues/config/collection.ts index 794dc870220..d910f0a16fb 100644 --- a/packages/payload/src/queues/config/collection.ts +++ b/packages/payload/src/queues/config/collection.ts @@ -1,5 +1,5 @@ import type { CollectionConfig } from '../../collections/config/types.js' -import type { Config, SanitizedConfig } from '../../config/types.js' +import type { SanitizedConfig } from '../../config/types.js' import type { Field } from '../../fields/config/types.js' import type { Job } from '../../index.js' @@ -9,18 +9,20 @@ import { getJobTaskStatus } from '../utilities/getJobTaskStatus.js' export const jobsCollectionSlug = 'payload-jobs' -export const getDefaultJobsCollection: (config: Config) => CollectionConfig = (config) => { +export const getDefaultJobsCollection: (jobsConfig: SanitizedConfig['jobs']) => CollectionConfig = ( + jobsConfig, +) => { const workflowSlugs: Set = new Set() const taskSlugs: Set = new Set(['inline']) - if (config.jobs?.workflows?.length) { - config.jobs?.workflows.forEach((workflow) => { + if (jobsConfig.workflows?.length) { + jobsConfig.workflows.forEach((workflow) => { workflowSlugs.add(workflow.slug) }) } - if (config.jobs?.tasks?.length) { - config.jobs.tasks.forEach((task) => { + if (jobsConfig.tasks?.length) { + jobsConfig.tasks.forEach((task) => { if (workflowSlugs.has(task.slug)) { throw new Error( `Task slug "${task.slug}" is already used by a workflow. No tasks are allowed to have the same slug as a workflow.`, @@ -79,7 +81,7 @@ export const getDefaultJobsCollection: (config: Config) => CollectionConfig = (c }, ] - if (config?.jobs?.addParentToTaskLog) { + if (jobsConfig.addParentToTaskLog) { logFields.push({ name: 'parent', type: 'group', @@ -241,6 +243,13 @@ export const getDefaultJobsCollection: (config: Config) => CollectionConfig = (c lockDocuments: false, } + if (jobsConfig.enabledStats) { + // TODO: In 4.0, this should be added by default. + jobsCollection.fields.push({ + name: 'meta', + type: 'json', + }) + } return jobsCollection } diff --git a/test/queues/payload-types.ts b/test/queues/payload-types.ts index b57cf325b09..d512621e440 100644 --- a/test/queues/payload-types.ts +++ b/test/queues/payload-types.ts @@ -350,6 +350,15 @@ export interface PayloadJob { queue?: string | null; waitUntil?: string | null; processing?: boolean | null; + meta?: + | { + [k: string]: unknown; + } + | unknown[] + | string + | number + | boolean + | null; updatedAt: string; createdAt: string; } @@ -483,6 +492,7 @@ export interface PayloadJobsSelect { queue?: T; waitUntil?: T; processing?: T; + meta?: T; updatedAt?: T; createdAt?: T; }