Skip to content

feat: scheduling jobs #12863

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 26 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
74f746f
initial API
AlessioGr Jun 12, 2025
fb81dc8
simplify
AlessioGr Jun 12, 2025
4f54278
Merge remote-tracking branch 'origin/main' into feat/schedule-jobs
AlessioGr Jun 13, 2025
ffed35b
Merge remote-tracking branch 'origin/main' into feat/schedule-jobs
AlessioGr Jun 13, 2025
a5916de
scheduler property
AlessioGr Jun 13, 2025
96fc231
Merge remote-tracking branch 'origin/main' into feat/schedule-jobs
AlessioGr Jun 15, 2025
35246cd
wip
AlessioGr Jun 15, 2025
36d2f90
Merge remote-tracking branch 'origin/main' into feat/schedule-jobs
AlessioGr Jun 17, 2025
b105425
typescript
AlessioGr Jun 17, 2025
b9924b0
Merge remote-tracking branch 'origin/main' into feat/schedule-jobs
AlessioGr Jun 17, 2025
ae2f0d9
done with the function
AlessioGr Jun 19, 2025
318bc04
add local api
AlessioGr Jun 19, 2025
3556edf
handleSchedules endpoint
AlessioGr Jun 19, 2025
5f63875
Merge remote-tracking branch 'origin/main' into feat/schedule-jobs
AlessioGr Jun 19, 2025
61725cb
bump croner
AlessioGr Jun 19, 2025
caca044
improve example
AlessioGr Jun 19, 2025
f38b580
fix incorrect croner usage
AlessioGr Jun 19, 2025
25228d9
Merge remote-tracking branch 'origin/main' into feat/schedule-jobs
AlessioGr Jun 19, 2025
0910627
fix: mongodb transform can error if db.updateGlobal is called without…
AlessioGr Jun 19, 2025
3a1c6e9
clarifies the jsdocs for that
AlessioGr Jun 19, 2025
9602a70
make it work
AlessioGr Jun 19, 2025
bbd914a
better ecxample logging, improve waitUntil field admin appearance
AlessioGr Jun 19, 2025
63d8734
Merge remote-tracking branch 'origin/main' into feat/schedule-jobs
AlessioGr Jun 19, 2025
33092e1
improve return types
AlessioGr Jun 19, 2025
0c90da8
fix: do not shut down jobs on HMR, as we never start them up again af…
AlessioGr Jun 19, 2025
e61ebab
feat: conditionally add meta field to jobs collection
AlessioGr Jun 19, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions packages/db-mongodb/src/utilities/transform.ts
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,10 @@ export const transform = ({
parentIsLocalized = false,
validateRelationships = true,
}: Args) => {
if (!data) {
return null
}

if (Array.isArray(data)) {
for (const item of data) {
transform({ adapter, data: item, fields, globalSlug, operation, validateRelationships })
Expand Down
2 changes: 1 addition & 1 deletion packages/payload/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@
"busboy": "^1.6.0",
"ci-info": "^4.1.0",
"console-table-printer": "2.12.1",
"croner": "9.0.0",
"croner": "9.1.0",
"dataloader": "2.2.3",
"deepmerge": "4.3.1",
"file-type": "19.3.0",
Expand Down
38 changes: 35 additions & 3 deletions packages/payload/src/config/sanitize.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ import {
} from '../locked-documents/config.js'
import { getPreferencesCollection, preferencesCollectionSlug } from '../preferences/config.js'
import { getQueryPresetsConfig, queryPresetsCollectionSlug } from '../query-presets/config.js'
import { getDefaultJobsCollection, jobsCollectionSlug } from '../queues/config/index.js'
import { getDefaultJobsCollection, jobsCollectionSlug } from '../queues/config/collection.js'
import { getJobStatsGlobal } from '../queues/config/global.js'
import { flattenBlock } from '../utilities/flattenAllFields.js'
import { getSchedulePublishTask } from '../versions/schedule/job.js'
import { addDefaultsToConfig } from './defaults.js'
Expand Down Expand Up @@ -300,7 +301,38 @@ export const sanitizeConfig = async (incomingConfig: Config): Promise<SanitizedC

// Need to add default jobs collection before locked documents collections
if (config.jobs.enabled) {
let defaultJobsCollection = getDefaultJobsCollection(config as unknown as Config)
// Check for schedule property in both tasks and workflows
let hasScheduleProperty =
config?.jobs?.tasks?.length && config.jobs.tasks.some((task) => task.schedule)

if (
!hasScheduleProperty &&
config?.jobs?.workflows?.length &&
config.jobs.workflows.some((workflow) => workflow.schedule)
) {
hasScheduleProperty = true
}

if (hasScheduleProperty) {
if (!config.jobs?.scheduler) {
throw new InvalidConfiguration(
'The jobs.scheduler property must be set when using scheduled tasks or workflows. Otherwise, the schedule property has no effect.',
)
}
// Add payload-jobs-stats global for tracking when a job of a specific slug was last run
;(config.globals ??= []).push(
await sanitizeGlobal(
config as unknown as Config,
getJobStatsGlobal(config as unknown as Config),
richTextSanitizationPromises,
validRelationships,
),
)

config.jobs.enabledStats = true
}

let defaultJobsCollection = getDefaultJobsCollection(config.jobs)

if (typeof config.jobs.jobsCollectionOverrides === 'function') {
defaultJobsCollection = config.jobs.jobsCollectionOverrides({
Expand Down Expand Up @@ -329,7 +361,7 @@ export const sanitizeConfig = async (incomingConfig: Config): Promise<SanitizedC
validRelationships,
)

configWithDefaults.collections!.push(sanitizedJobsCollection)
;(config.collections ??= []).push(sanitizedJobsCollection)
}

configWithDefaults.collections!.push(
Expand Down
2 changes: 1 addition & 1 deletion packages/payload/src/database/defaultUpdateJobs.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { DatabaseAdapter, Job } from '../index.js'
import type { UpdateJobs } from './types.js'

import { jobsCollectionSlug } from '../queues/config/index.js'
import { jobsCollectionSlug } from '../queues/config/collection.js'

export const defaultUpdateJobs: UpdateJobs = async function updateMany(
this: DatabaseAdapter,
Expand Down
3 changes: 3 additions & 0 deletions packages/payload/src/database/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,9 @@ export interface BaseDatabaseAdapter {
}
}

/**
* Updates a global that exists. If the global doesn't exist yet, this will not work - you should use `createGlobal` instead.
*/
updateGlobal: UpdateGlobal

updateGlobalVersion: UpdateGlobalVersion
Expand Down
17 changes: 9 additions & 8 deletions packages/payload/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -805,14 +805,13 @@ export class BasePayload {

await Promise.all(
cronJobs.map((cronConfig) => {
const job = new Cron(cronConfig.cron ?? DEFAULT_CRON, async () => {
const jobAutorunCron = new Cron(cronConfig.cron ?? DEFAULT_CRON, async () => {
if (typeof this.config.jobs.shouldAutoRun === 'function') {
const shouldAutoRun = await this.config.jobs.shouldAutoRun(this)

if (!shouldAutoRun) {
job.stop()

return false
jobAutorunCron.stop()
return
}
}

Expand All @@ -822,7 +821,7 @@ export class BasePayload {
})
})

this.crons.push(job)
this.crons.push(jobAutorunCron)
}),
)
}
Expand Down Expand Up @@ -872,8 +871,10 @@ export const reload = async (
payload: Payload,
skipImportMapGeneration?: boolean,
): Promise<void> => {
await payload.destroy()

if (typeof payload.db.destroy === 'function') {
// Only destroy db, as we then later only call payload.db.init and not payload.init
await payload.db.destroy()
}
payload.config = config

payload.collections = config.collections.reduce(
Expand Down Expand Up @@ -1459,7 +1460,7 @@ export type {
TabsPreferences,
} from './preferences/types.js'
export type { QueryPreset } from './query-presets/types.js'
export { jobAfterRead } from './queues/config/index.js'
export { jobAfterRead } from './queues/config/collection.js'
export type { JobsConfig, RunJobAccess, RunJobAccessArgs } from './queues/config/types/index.js'

export type {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,28 @@
import type { CollectionConfig } from '../../collections/config/types.js'
import type { Config, SanitizedConfig } from '../../config/types.js'
import type { SanitizedConfig } from '../../config/types.js'
import type { Field } from '../../fields/config/types.js'
import type { Job } from '../../index.js'

import { runJobsEndpoint } from '../restEndpointRun.js'
import { handleSchedulesJobsEndpoint } from '../endpoints/handleSchedules.js'
import { runJobsEndpoint } from '../endpoints/run.js'
import { getJobTaskStatus } from '../utilities/getJobTaskStatus.js'

export const jobsCollectionSlug = 'payload-jobs'

export const getDefaultJobsCollection: (config: Config) => CollectionConfig = (config) => {
export const getDefaultJobsCollection: (jobsConfig: SanitizedConfig['jobs']) => CollectionConfig = (
jobsConfig,
) => {
const workflowSlugs: Set<string> = new Set()
const taskSlugs: Set<string> = new Set(['inline'])

if (config.jobs?.workflows?.length) {
config.jobs?.workflows.forEach((workflow) => {
if (jobsConfig.workflows?.length) {
jobsConfig.workflows.forEach((workflow) => {
workflowSlugs.add(workflow.slug)
})
}

if (config.jobs?.tasks?.length) {
config.jobs.tasks.forEach((task) => {
if (jobsConfig.tasks?.length) {
jobsConfig.tasks.forEach((task) => {
if (workflowSlugs.has(task.slug)) {
throw new Error(
`Task slug "${task.slug}" is already used by a workflow. No tasks are allowed to have the same slug as a workflow.`,
Expand Down Expand Up @@ -78,7 +81,7 @@ export const getDefaultJobsCollection: (config: Config) => CollectionConfig = (c
},
]

if (config?.jobs?.addParentToTaskLog) {
if (jobsConfig.addParentToTaskLog) {
logFields.push({
name: 'parent',
type: 'group',
Expand All @@ -102,7 +105,7 @@ export const getDefaultJobsCollection: (config: Config) => CollectionConfig = (c
group: 'System',
hidden: true,
},
endpoints: [runJobsEndpoint],
endpoints: [runJobsEndpoint, handleSchedulesJobsEndpoint],
fields: [
{
name: 'input',
Expand Down Expand Up @@ -198,6 +201,9 @@ export const getDefaultJobsCollection: (config: Config) => CollectionConfig = (c
{
name: 'waitUntil',
type: 'date',
admin: {
date: { pickerAppearance: 'dayAndTime' },
},
index: true,
},
{
Expand Down Expand Up @@ -237,6 +243,13 @@ export const getDefaultJobsCollection: (config: Config) => CollectionConfig = (c
lockDocuments: false,
}

if (jobsConfig.enabledStats) {
// TODO: In 4.0, this should be added by default.
jobsCollection.fields.push({
name: 'meta',
type: 'json',
})
}
return jobsCollection
}

Expand Down
45 changes: 45 additions & 0 deletions packages/payload/src/queues/config/global.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import type { Config } from '../../config/types.js'
import type { GlobalConfig } from '../../globals/config/types.js'
import type { TaskType } from './types/taskTypes.js'
import type { WorkflowTypes } from './types/workflowTypes.js'

export const jobStatsGlobalSlug = 'payload-jobs-stats'

/**
* Type for data stored in the payload-jobs-stats global.
*/
export type JobStats = {
stats?: {
scheduledRuns?: {
queues?: {
[queueSlug: string]: {
tasks?: {
[taskSlug: TaskType]: {
lastScheduledRun: string
}
}
workflows?: {
[workflowSlug: WorkflowTypes]: {
lastScheduledRun: string
}
}
}
}
}
}
}

/**
* Global config for job statistics.
*/
export const getJobStatsGlobal: (config: Config) => GlobalConfig = (config) => {
return {
slug: jobStatsGlobalSlug,
fields: [
{
name: 'stats',
type: 'json',
},
],
}
}
Loading
Loading