diff --git a/ghost/core/test/integration/jobs/job-queue.test.js b/ghost/core/test/integration/jobs/job-queue.test.js deleted file mode 100644 index 81ca8f18dd5..00000000000 --- a/ghost/core/test/integration/jobs/job-queue.test.js +++ /dev/null @@ -1,119 +0,0 @@ -const assert = require('assert/strict'); -const path = require('path'); -const configUtils = require('../../utils/configUtils'); -const models = require('../../../core/server/models'); -const testUtils = require('../../utils/'); -const events = require('../../../core/server/lib/common/events'); - -// Helper function to wait for job completion -async function waitForJobCompletion(jobName, maxWaitTimeMs = 5000, checkIntervalMs = 50) { - return new Promise((resolve, reject) => { - const startTime = Date.now(); - const intervalId = setInterval(async () => { - if (Date.now() - startTime >= maxWaitTimeMs) { - clearInterval(intervalId); - reject(new Error(`Job ${jobName} did not complete within ${maxWaitTimeMs}ms`)); - } - const job = await models.Job.findOne({name: jobName}); - if (!job) { - clearInterval(intervalId); - resolve(); - } - }, checkIntervalMs); - }); -} - -describe('Job Queue', function () { - let jobService; - before(testUtils.setup('default')); // this generates the tables in the db - afterEach(async function () { - await configUtils.restore(); - }); - afterEach(testUtils.teardownDb); - - describe('enabled by config', function () { - beforeEach(async function () { - configUtils.set('services:jobs:queue:enabled', true); - jobService = require('../../../core/server/services/jobs/job-service'); - }); - - it('should add and execute a job in the queue', async function () { - this.timeout(10000); - const job = { - name: `add-random-numbers-${Date.now()}`, - metadata: { - job: path.resolve(__dirname, './test-job.js'), - data: {} - } - }; - - // Add the job to the queue - const result = await jobService.addQueuedJob(job); - assert.ok(result); - - // Wait for the job to complete - await waitForJobCompletion(job.name, 8000); // Increase wait time - - // Verify that the job no longer exists in the queue - const jobEntry = await models.Job.findOne({name: job.name}); - assert.equal(jobEntry, null); - }); - - it('should emit events if present in result', async function () { - this.timeout(10000); - const job = { - name: `emit-events-${Date.now()}`, - metadata: { - job: path.resolve(__dirname, './test-job-events.js'), - data: {} - } - }; - - let eventEmitted = false; - let eventData = null; - - // Set up the event listener - events.on('member.edited', (data) => { - eventEmitted = true; - eventData = data; - }); - - const result = await jobService.addQueuedJob(job); - assert.ok(result); - - await waitForJobCompletion(job.name, 8000); // Increase wait time - - // Assert that the event was emitted - assert.ok(eventEmitted, 'Expected job.completed event to be emitted'); - assert.ok(eventData, 'Expected event data to be captured'); - - const jobEntry = await models.Job.findOne({name: job.name}); - assert.equal(jobEntry, null); - }); - }); - - describe('not enabled', function () { - beforeEach(async function () { - configUtils.set('services:jobs:queue:enabled', false); - jobService = require('../../../core/server/services/jobs/job-service'); - }); - - it('should not add a job to the queue when disabled', async function () { - const job = { - name: `add-random-numbers-${Date.now()}`, - metadata: { - job: path.resolve(__dirname, './test-job.js'), - data: {} - } - }; - - // Attempt to add the job to the queue - const result = await jobService.addQueuedJob(job); - assert.equal(result, undefined); - - // Verify that the job doesn't exist in the queue - const jobEntry = await models.Job.findOne({name: job.name}); - assert.equal(jobEntry, null); - }); - }); -}); \ No newline at end of file diff --git a/ghost/core/test/integration/jobs/test-job-events.js b/ghost/core/test/integration/jobs/test-job-events.js deleted file mode 100644 index 02f2ab73a2c..00000000000 --- a/ghost/core/test/integration/jobs/test-job-events.js +++ /dev/null @@ -1,19 +0,0 @@ -/** - * A job that simulates an event-driven process. - * @returns {Object} An object containing the event data. - */ -module.exports = function jobWithEvents() { - const num1 = Math.floor(Math.random() * 100); - const num2 = Math.floor(Math.random() * 100); - const result = num1 + num2; - - return { - success: true, - data: { - result: result - }, - eventData: { - events: [{name: 'member.edited', data: {id: '1'}}] - } - }; -}; \ No newline at end of file diff --git a/ghost/core/test/integration/jobs/test-job.js b/ghost/core/test/integration/jobs/test-job.js deleted file mode 100644 index c2d09320e8c..00000000000 --- a/ghost/core/test/integration/jobs/test-job.js +++ /dev/null @@ -1,7 +0,0 @@ -module.exports = function testJob() { - const num1 = Math.floor(Math.random() * 100); - const num2 = Math.floor(Math.random() * 100); - const result = num1 + num2; - - return result; -}; \ No newline at end of file diff --git a/ghost/i18n/locales/vi/comments.json b/ghost/i18n/locales/vi/comments.json index a4358a7034b..450997e85a8 100644 --- a/ghost/i18n/locales/vi/comments.json +++ b/ghost/i18n/locales/vi/comments.json @@ -13,7 +13,7 @@ "Are you sure?": "Bạn có chắc không?", "Become a member of {{publication}} to start commenting.": "Trở thành thành viên của {{publication}} để có thể bình luận.", "Become a paid member of {{publication}} to start commenting.": "Trở thành thành viên trả phí của {{publication}} để có thể bình luận.", - "Best": "", + "Best": "Hay nhất", "Cancel": "Hủy", "Comment": "Bình luận", "Complete your profile": "Hoàn tất hồ sơ của bạn", @@ -41,8 +41,8 @@ "Member discussion": "Thảo luận của thành viên", "Name": "Tên", "Neurosurgeon": "Bác sĩ phẫu thuật thần kinh", - "Newest": "", - "Oldest": "", + "Newest": "Mới nhất", + "Oldest": "Cũ nhất", "Once deleted, this comment can’t be recovered.": "Một khi đã xóa, bình luận này không thể khôi phục.", "One hour ago": "Một giờ trước", "One min ago": "Một phút trước", diff --git a/ghost/job-manager/README.md b/ghost/job-manager/README.md index 714dadd77c5..d870ab7ef4a 100644 --- a/ghost/job-manager/README.md +++ b/ghost/job-manager/README.md @@ -1,12 +1,11 @@ # Job Manager -A manager for background jobs in Ghost, supporting one-off tasks, recurring jobs, and persistent job queues. +A manager for background jobs in Ghost, supporting one-off tasks and recurring jobs. ## Table of Contents - [Quick Start](#quick-start) - [Job Types](#job-types) - [Background Job Requirements](#background-job-requirements) -- [Configuration](#configuration) - [Advanced Usage](#advanced-usage) - [Technical Details](#technical-details) @@ -29,20 +28,11 @@ jobManager.addJob({ at: 'every 5 minutes', job: './jobs/check-emails.js' }); - -// Persisted background job -jobManager.addQueuedJob({ - name: 'update-member-analytics-123', - metadata: { - job: './jobs/update-analytics.js', - data: { memberId: '123' } - } -}); ``` ## Job Types -Ghost supports three types of jobs: +Ghost supports two types of jobs: 1. **Inline Jobs** - Run in the main Ghost process @@ -54,15 +44,9 @@ Ghost supports three types of jobs: - Good for CPU-intensive tasks - Can be scheduled or recurring -3. **Persisted Queue Jobs** - - Stored in database - - Survive server restarts - - Best for background tasks - - Can be monitored and retried - ## Background Job Requirements -Both offloaded and queued jobs must: +Offloaded jobs must: - Have a unique name - Be idempotent (safe to run multiple times) @@ -70,23 +54,6 @@ Both offloaded and queued jobs must: - Import their own dependencies - Primarily use DB or API calls -## Configuration - -The job queue can be configured through Ghost's config: - -```js -jobs: { - queue: { - enabled: true, - maxWorkers: 1, - pollMinInterval: 1000, // 1 sec - pollMaxInterval: 60000, // 1 min - queueCapacity: 500, // Max queued jobs - fetchCount: 500 // Max jobs per poll - } -} -``` - ## Advanced Usage Below is a sample code to wire up job manger and initialize jobs. This is the simplest way to interact with the job manager - these jobs do not persist after reboot: @@ -182,66 +149,3 @@ Offloaded jobs are running on dedicated worker threads which makes their lifecyc 4. When **exceptions** happen and expected outcome is to terminate current job, leave the exception unhandled allowing it to bubble up to the job manager. Unhandled exceptions [terminate current thread](https://nodejs.org/dist/latest-v14.x/docs/api/worker_threads.html#worker_threads_event_error) and allow for next scheduled job execution to happen. For more nuances on job structure best practices check [bree documentation](https://github.com/breejs/bree#writing-jobs-with-promises-and-async-await). - -### Implementation Notes -For any persisted tasks, the Job Manager has a queue based on the `jobs` table. This table is polled regularly and processed with a single worker, and is ideal for background tasks, e.g. updating member analytics. (see notes below about job types) - -```js -// the job manager is typically injected into the consumer service via the service wrapper -// from there -const JobManager = require('../../services/jobs'); -... - -// ** job submission should be handled by the wrapper service ** -// this could be via subscription to events, emitted by the wrapped service(s) -domainEvents.subscribe(MemberEmailAnalyticsUpdateEvent, async (event) => { - const memberId = event.data.memberId; - await JobManager.addQueuedJob({ - name: `update-member-email-analytics-${memberId}`, - metadata: { - job: path.resolve(__dirname, 'jobs/update-member-email-analytics'), - name: 'update-member-email-analytics', - data: { - memberId - } - } - }); -}); - -// or it could be passed through as a hook -const handleAnalyticsJobSubmission = async (memberId) => { - await JobManager.addQueuedJob({ - name: `update-member-email-analytics-${memberId}`, - metadata: { - job: path.resolve(__dirname, 'jobs/update-member-email-analytics'), - name: 'update-member-email-analytics', - data: { - memberId - } - } - }); -} -``` -In most cases, jobs should not be submitted by services directly. Because they must import what is needed, it would require too many injected dependencies. - -### Adjusting the Job Queue -The queue manager will poll the `jobs` table every minute unless jobs are being actively or were recently processed, where it will instead poll every second. - -The job queue has a few other config flags for the number of workers, polling rate, max jobs to process, etc. - -``` -services: { - jobs: { - queue: { - enabled: true, - reportStats: true, - maxWorkers: 1, - logLevel: 'info' | 'error', - pollMinInterval: 1000, // 1 sec - pollMaxInterval: 60000, // 1 min - queueCapacity: 500, // # of jobs in the process queue at any time - fetchCount: 500 // max # of jobs fetched in each poll - } - } -} -``` \ No newline at end of file diff --git a/ghost/job-manager/lib/JobManager.js b/ghost/job-manager/lib/JobManager.js index 5ba0a200443..fda14a80065 100644 --- a/ghost/job-manager/lib/JobManager.js +++ b/ghost/job-manager/lib/JobManager.js @@ -7,11 +7,9 @@ const Bree = require('bree'); const pWaitFor = require('p-wait-for'); const {UnhandledJobError, IncorrectUsageError} = require('@tryghost/errors'); const logging = require('@tryghost/logging'); -const metrics = require('@tryghost/metrics'); const isCronExpression = require('./is-cron-expression'); const assembleBreeJob = require('./assemble-bree-job'); const JobsRepository = require('./JobsRepository'); -const JobQueueManager = require('./JobQueueManager'); const worker = async (task, callback) => { try { @@ -40,7 +38,6 @@ const ALL_STATUSES = { class JobManager { #domainEvents; #completionPromises = new Map(); - #jobQueueManager = null; #config; #JobModel; #events; @@ -52,11 +49,9 @@ class JobManager { * @param {Object} [options.JobModel] - a model which can persist job data in the storage * @param {Object} [options.domainEvents] - domain events emitter * @param {Object} [options.config] - config - * @param {boolean} [options.isDuplicate] - if true, the job manager will not initialize the job queue - * @param {JobQueueManager} [options.jobQueueManager] - job queue manager instance (for testing) * @param {Object} [options.events] - events instance (for testing) */ - constructor({errorHandler, workerMessageHandler, JobModel, domainEvents, config, isDuplicate = false, jobQueueManager = null, events = null}) { + constructor({errorHandler, workerMessageHandler, JobModel, domainEvents, config, events = null}) { this.inlineQueue = fastq(this, worker, 3); this._jobMessageHandler = this._jobMessageHandler.bind(this); this._jobErrorHandler = this._jobErrorHandler.bind(this); @@ -95,19 +90,6 @@ class JobManager { if (JobModel) { this._jobsRepository = new JobsRepository({JobModel}); } - - if (jobQueueManager) { - this.#jobQueueManager = jobQueueManager; - } else if (!isDuplicate) { - this.#initializeJobQueueManager(); - } - } - - #initializeJobQueueManager() { - if (this.#config?.get('services:jobs:queue:enabled') === true && !this.#jobQueueManager) { - this.#jobQueueManager = new JobQueueManager({JobModel: this.#JobModel, config: this.#config, eventEmitter: this.#events, metricLogger: metrics}); - this.#jobQueueManager.init(); - } } inlineJobHandler(jobName) { @@ -128,35 +110,6 @@ class JobManager { }; } - /** - * @typedef {Object} QueuedJob - * @property {string} name - The name or identifier of the job. - * @property {Object} metadata - Metadata associated with the job. - * @property {string} metadata.job - The absolute path to the job to execute. - * @property {string} metadata.name - The name of the job. Used for metrics. - * @property {Object} metadata.data - The data associated with the job. - */ - - /** - * @method addQueuedJob - * @async - * @description Adds a new job to the job repository, which will be polled and executed by the job queue manager. - * @param {QueuedJob} job - The job to be added to the queue. - * @returns {Promise} The added job model. - */ - async addQueuedJob({name, metadata}) { - // Try to initialize JobQueueManager if it's missing - if (!this.#jobQueueManager) { - this.#initializeJobQueueManager(); - } - - if (this.#config?.get('services:jobs:queue:enabled') === true && this.#jobQueueManager) { - const model = await this.#jobQueueManager.addJob({name, metadata}); - return model; - } - return undefined; - } - async _jobMessageHandler({name, message}) { if (name) { if (message === ALL_STATUSES.started) { @@ -459,10 +412,6 @@ class JobManager { async shutdown(options) { await this.bree.stop(); - if (this.#jobQueueManager) { - await this.#jobQueueManager.shutdown(); - } - if (this.inlineQueue.idle()) { return; } diff --git a/ghost/job-manager/lib/JobQueueManager.js b/ghost/job-manager/lib/JobQueueManager.js deleted file mode 100644 index c45d80654d4..00000000000 --- a/ghost/job-manager/lib/JobQueueManager.js +++ /dev/null @@ -1,238 +0,0 @@ -const workerpool = require('workerpool'); -const path = require('path'); -const JobsRepository = require('./JobsRepository'); -const debug = require('@tryghost/debug')('job-manager:JobQueueManager'); -const logging = require('@tryghost/logging'); -const metrics = require('@tryghost/metrics'); - -class JobQueueManager { - constructor({JobModel, config, logger = logging, metricLogger = metrics, WorkerPool = workerpool, eventEmitter}) { - this.jobsRepository = new JobsRepository({JobModel}); - this.config = this.initializeConfig(config?.get('services:jobs:queue') || {}); - this.logger = this.createLogger(logger, this.config.logLevel); - this.metricLogger = metricLogger; - this.WorkerPool = WorkerPool; - this.pool = this.createWorkerPool(); - this.state = this.initializeState(); - this.eventEmitter = eventEmitter; - this.metricCache = { - jobCompletionCount: 0, - queueDepth: 0, - emailAnalyticsAggregateMemberStatsCount: 0 - }; - } - - createLogger(logger, logLevel) { - return { - debug: (message) => { - if (logLevel === 'debug') { - logger.debug(`[JobQueueManager] ${message}`); - } - }, - info: (message) => { - if (logLevel === 'info' || logLevel === 'debug') { - logger.info(`[JobQueueManager] ${message}`); - } - }, - error: (message, error) => { - if (logLevel === 'info' || logLevel === 'error' || logLevel === 'debug') { - logger.error(`[JobQueueManager] ${message}`, error); - } - } - }; - } - - initializeConfig(queueConfig) { - return { - MIN_POLL_INTERVAL: queueConfig.pollMinInterval || 1000, - MAX_POLL_INTERVAL: queueConfig.pollMaxInterval || 60000, - QUEUE_CAPACITY: queueConfig.queueCapacity || 500, - FETCH_COUNT: queueConfig.fetchCount || 500, - INCREASE_INTERVAL_THRESHOLD: 30000, - maxWorkers: queueConfig.maxWorkers, - reportStats: queueConfig.reportStats, - reportInterval: queueConfig.reportInterval || 60000, - logLevel: queueConfig.logLevel - }; - } - - initializeState() { - return { - currentPollInterval: this.config.MIN_POLL_INTERVAL, - lastFoundJobTime: Date.now(), - isPolling: false, - queuedJobs: new Set() - }; - } - - createWorkerPool() { - const poolOptions = { - workerType: 'thread', - workerTerminateTimeout: 10000 - }; - if (this.config.maxWorkers) { - poolOptions.maxWorkers = this.config.maxWorkers; - } - return this.WorkerPool.pool(path.join(__dirname, '/workers/generic-worker.js'), poolOptions); - } - - async init() { - debug('[JobQueueManager] Initializing job queue'); - this.startQueueProcessor(); - if (this.config.reportStats) { - this.reportStats(); - } - } - - async startQueueProcessor() { - const poll = async () => { - if (this.state.isPolling) { - this.logger.debug('Already polling, skipping this cycle'); - return; - } - - this.state.isPolling = true; - this.logger.debug(`Polling for jobs; current interval: ${Math.floor(this.state.currentPollInterval / 1000)}s`); - - try { - await this.processPendingJobs(); - } catch (error) { - this.logger.error('Error in queue filler:', error); - } finally { - this.state.isPolling = false; - this.queueFillerTimeout = setTimeout(poll, this.state.currentPollInterval); - } - }; - - poll(); // Initial poll - } - - async processPendingJobs() { - const stats = await this.getStats(); - if (stats.pendingTasks <= this.config.QUEUE_CAPACITY) { - this.logger.debug('Processing pending jobs'); - const entriesToAdd = Math.min(this.config.FETCH_COUNT, this.config.FETCH_COUNT - stats.pendingTasks); - const {data: jobs, total} = await this.jobsRepository.getQueuedJobs(entriesToAdd); - this.metricCache.queueDepth = total || 0; - this.logger.debug(`Adding up to ${entriesToAdd} queue entries. Current pending tasks: ${stats.pendingTasks}. Current worker count: ${stats.totalWorkers}. Current depth: ${total}.`); - this.updatePollInterval(jobs); - await this.processJobs(jobs); - } - } - - updatePollInterval(jobs) { - if (jobs.length > 0) { - this.state.lastFoundJobTime = Date.now(); - this.state.currentPollInterval = this.config.MIN_POLL_INTERVAL; - } else { - const timeSinceLastJob = Date.now() - this.state.lastFoundJobTime; - if (timeSinceLastJob > this.config.INCREASE_INTERVAL_THRESHOLD) { - this.state.currentPollInterval = this.config.MAX_POLL_INTERVAL; - } - } - } - - /** - * Emits events to the Node event emitter - * @param {Array<{name: string, data: any}>} events - The events to emit, e.g. member.edited - */ - emitEvents(events) { - events.forEach((e) => { - this.eventEmitter.emit(e.name, e.data); - }); - } - - async processJobs(jobs) { - for (const job of jobs) { - const jobMetadata = JSON.parse(job.get('metadata')); - const jobName = jobMetadata.name; - if (this.state.queuedJobs.has(jobName)) { - continue; - } - await this.executeJob(job, jobName, jobMetadata); - } - } - - async executeJob(job, jobName, jobMetadata) { - this.state.queuedJobs.add(jobName); - try { - /** - * @param {'executeJob'} jobName - This is the generic job execution fn - * @param {Array<{name: string, data: any}>} args - The arguments to pass to the job execution fn - * @returns {Promise<{success?: boolean, eventData?: {events: Array<{name: string, data: any}>}}>} - */ - const result = await this.pool.exec('executeJob', [jobMetadata.job, jobMetadata.data]); - await this.jobsRepository.delete(job.id); - this.metricCache.jobCompletionCount += 1; - if (jobName === 'update-member-email-analytics') { - this.metricCache.emailAnalyticsAggregateMemberStatsCount += 1; - } - if (result && result.eventData) { - this.emitEvents(result.eventData.events); // this is nested within eventData because we may want to support DomainEvents emission as well - } - } catch (error) { - await this.handleJobError(job, jobMetadata, error); - } finally { - this.state.queuedJobs.delete(jobName); - } - } - - async handleJobError(job, jobMetadata, error) { - let errorMessage; - if (error instanceof Error) { - errorMessage = error.message; - } else if (typeof error === 'string') { - errorMessage = error; - } else { - errorMessage = JSON.stringify(error); - } - - const updateData = { - status: 'failed', - finished_at: new Date(), - metadata: JSON.stringify({ - ...jobMetadata, - error: errorMessage, - retries: (jobMetadata.retries || 0) + 1 - }) - }; - - await this.jobsRepository.update(job.id, updateData); - } - - async addJob({name, metadata}) { - const model = await this.jobsRepository.addQueuedJob({name, metadata}); - return model; - } - - async getStats() { - return this.pool.stats(); - } - - reportStats() { - setInterval(() => { - this._doReportStats(); - }, this.config.reportInterval); - } - - _doReportStats() { - const poolStats = this.pool.stats(); - const stats = { - ...poolStats, - ...this.metricCache - }; - const statsString = JSON.stringify(stats, null, 2); - this.logger.info(`Job Queue Stats: ${statsString}`); - this.metricLogger.metric('job_manager_queue', stats); - } - - async shutdown() { - try { - await this.pool.terminate(); - } catch (error) { - this.logger.error('Error terminating worker pool:', error); - } - } -} - -module.exports = JobQueueManager; \ No newline at end of file diff --git a/ghost/job-manager/lib/JobsRepository.js b/ghost/job-manager/lib/JobsRepository.js index e2c4f50348b..2646a8c805a 100644 --- a/ghost/job-manager/lib/JobsRepository.js +++ b/ghost/job-manager/lib/JobsRepository.js @@ -1,4 +1,3 @@ -const ObjectID = require('bson-objectid').default; const logging = require('@tryghost/logging'); /** @@ -53,71 +52,6 @@ class JobsRepository { await this._JobModel.edit(data, {id}); } - /** - * @method getNextQueuedJob - * @async - * @description Retrieves the next queued job from the database. - * @returns {Promise} The next queued job object if found, null otherwise. - */ - async getNextQueuedJob() { - const job = await this._JobModel.findOne({ - queue_entry: 1 - }); - return job; - } - - /** - * @method getQueuedJobs - * @async - * @description Retrieves a list of queued jobs from the database. - * @param {number} [limit=50] - The maximum number of jobs to retrieve. - * @returns {Promise} An object containing the queued job data and total count. - */ - async getQueuedJobs(limit = 50) { - const jobs = await this._JobModel.findPage({ - filter: 'queue_entry:1', - limit - }); - return {data: jobs.data, total: jobs.meta.pagination.total}; - } - - /** - * @typedef {Object} QueuedJob - * @property {string} name - The name or identifier of the job. - * @property {Object} metadata - Metadata associated with the job. - * @property {string} metadata.job - The absolute path to the job to execute. - * @property {Object} metadata.data - The data associated with the job. - */ - - /** - * @method addQueuedJob - * @async - * @description Adds a new queued job to the database. - * @param {QueuedJob} job - The job to be added to the queue. - * @returns {Promise} The added job object. - */ - async addQueuedJob({name, metadata}) { - let job; - await this._JobModel.transaction(async (transacting) => { - // Check if a job with this name already exist - const existingJob = await this._JobModel.findOne({name}, {transacting}); - if (!existingJob) { - // If no existing job, create a new one - job = await this._JobModel.add({ - id: new ObjectID().toHexString(), - name: name, - status: 'queued', - created_at: new Date(), - metadata: JSON.stringify(metadata), - queue_entry: 1 - }, {transacting}); - } - // If existingJob is found, do nothing (equivalent to IGNORE) - }); - - return job; // Will be undefined if the job already existed - } - /** * @method delete * @async diff --git a/ghost/job-manager/lib/workers/generic-worker.js b/ghost/job-manager/lib/workers/generic-worker.js deleted file mode 100644 index 05fe50a95b9..00000000000 --- a/ghost/job-manager/lib/workers/generic-worker.js +++ /dev/null @@ -1,64 +0,0 @@ -const errors = require('@tryghost/errors'); - -/** - * @module generic-worker - * @description A generic worker module for executing jobs in a worker pool. This allows consuming code to pass in a job file - * when calling for the worker pool to execute a job. - */ - -const workerpool = require('workerpool'); - -/** - * @function executeJob - * @description Executes a job by requiring the job module and calling it with the provided data. - * @param {string} jobPath - The absolute file path to the job module. - * @param {Object} jobData - The data to be passed to the job function as the first argument. - * @returns {Promise<*>} The result of the job execution. - * @throws {Error} If the job module doesn't export a function or if the execution fails. - */ -function executeJob(jobPath, jobData) { - let jobModule; - try { - jobModule = require(jobPath); - } catch (err) { - throw new errors.IncorrectUsageError({ - message: `Failed to load job module: ${err.message}`, - err - }); - } - - if (typeof jobModule !== 'function') { - throw new errors.IncorrectUsageError({ - message: `Job module at ${jobPath} does not export a function` - }); - } - - try { - return jobModule(jobData); - } catch (err) { - throw new errors.IncorrectUsageError({ - message: `Failed to execute job: ${err.message}`, - err - }); - } -} - -/** - * @function registerWorker - * @description Registers the executeJob function as a worker method with workerpool. - */ -function registerWorker() { - workerpool.worker({ - executeJob: executeJob - }); -} - -// Only register the worker if this file is being run directly -if (require.main === module) { - registerWorker(); -} - -module.exports = { - executeJob, - registerWorker -}; \ No newline at end of file diff --git a/ghost/job-manager/test/generic-worker.test.js b/ghost/job-manager/test/generic-worker.test.js deleted file mode 100644 index 34ceede37b6..00000000000 --- a/ghost/job-manager/test/generic-worker.test.js +++ /dev/null @@ -1,56 +0,0 @@ -const rewire = require('rewire'); -const sinon = require('sinon'); -const path = require('path'); -const GhostErrors = require('@tryghost/errors'); - -describe('Generic Worker', function () { - let genericWorker; - let workerpoolStub; - - beforeEach(function () { - workerpoolStub = { - worker: sinon.stub() - }; - - genericWorker = rewire('../lib/workers/generic-worker'); - genericWorker.__set__('workerpool', workerpoolStub); - }); - - describe('executeJob', function () { - it('should execute a valid job module', function () { - const jobPath = path.join(__dirname, 'mock-job.js'); - const jobData = {test: 'data'}; - const mockJobModule = sinon.stub().returns('job result'); - - genericWorker.__set__('require', p => (p === jobPath ? mockJobModule : require(p))); - - const result = genericWorker.executeJob(jobPath, jobData); - - mockJobModule.calledWith(jobData).should.be.true; - result.should.equal('job result'); - }); - - it('should throw an error if job module does not export a function', function () { - const jobPath = path.join(__dirname, 'invalid-job.js'); - const jobData = {test: 'data'}; - - genericWorker.__set__('require', p => (p === jobPath ? {} : require(p))); - - (() => genericWorker.executeJob(jobPath, jobData)).should.throw(GhostErrors.IncorrectUsageError, { - message: `Job module at ${jobPath} does not export a function` - }); - }); - - it('should throw an error if job execution fails', function () { - const jobPath = path.join(__dirname, 'failing-job.js'); - const jobData = {test: 'data'}; - const mockJobModule = sinon.stub().throws(new Error('Job execution failed')); - - genericWorker.__set__('require', p => (p === jobPath ? mockJobModule : require(p))); - - (() => genericWorker.executeJob(jobPath, jobData)).should.throw(GhostErrors.IncorrectUsageError, { - message: 'Failed to execute job: Job execution failed' - }); - }); - }); -}); \ No newline at end of file diff --git a/ghost/job-manager/test/job-manager.test.js b/ghost/job-manager/test/job-manager.test.js index 4f5258b8473..1b344d38811 100644 --- a/ghost/job-manager/test/job-manager.test.js +++ b/ghost/job-manager/test/job-manager.test.js @@ -18,16 +18,8 @@ const jobModelInstance = { } }; -const queuedJob = { - name: 'test-job', - metadata: { - job: path.resolve(__dirname, './jobs/simple.js'), - data: 'test data' - } -}; - describe('Job Manager', function () { - let stubConfig, stubJobQueueManager, jobManager; + let stubConfig, jobManager; beforeEach(function () { sandbox.stub(logging, 'info'); @@ -36,20 +28,12 @@ describe('Job Manager', function () { stubConfig = { get: sinon.stub().returns({ - enabled: true, - queue: { - enabled: true - } + enabled: true }) }; - stubJobQueueManager = { - addJob: sinon.stub().resolves({id: 'job1'}) - }; - jobManager = new JobManager({ - config: stubConfig, - jobQueueManager: stubJobQueueManager + config: stubConfig }); }); @@ -66,7 +50,6 @@ describe('Job Manager', function () { should.exist(jobManager.removeJob); should.exist(jobManager.shutdown); should.exist(jobManager.inlineJobHandler); - should.exist(jobManager.addQueuedJob); }); describe('Add a job', function () { @@ -678,23 +661,6 @@ describe('Job Manager', function () { }); }); - describe('Add a queued job', function () { - it('submits a job to the job queue if enabled', async function () { - stubConfig.get.returns(true); - const result = await jobManager.addQueuedJob(queuedJob); - should(stubJobQueueManager.addJob.calledOnce).be.true(); - should(stubJobQueueManager.addJob.firstCall.args[0]).deepEqual(queuedJob); - should(result).have.property('id', 'job1'); - }); - - it('does not submit a job to the job queue if disabled', async function () { - stubConfig.get.returns(false); - const result = await jobManager.addQueuedJob(queuedJob); - should(stubJobQueueManager.addJob.called).be.false(); - should(result).be.undefined(); - }); - }); - describe('Shutdown', function () { it('gracefully shuts down inline jobs', async function () { jobManager = new JobManager({config: stubConfig}); diff --git a/ghost/job-manager/test/job-queue-manager.test.js b/ghost/job-manager/test/job-queue-manager.test.js deleted file mode 100644 index d3659b57110..00000000000 --- a/ghost/job-manager/test/job-queue-manager.test.js +++ /dev/null @@ -1,439 +0,0 @@ -const sinon = require('sinon'); -const {expect} = require('chai'); -const JobQueueManager = require('../lib/JobQueueManager'); - -describe('JobQueueManager', function () { - let jobQueueManager; - let mockJobModel; - let mockConfig; - let mockLogger; - let mockMetricLogger; - let mockWorkerPool; - let mockEventEmitter; - beforeEach(function () { - mockJobModel = {}; - mockConfig = { - get: sinon.stub().returns({}) - }; - mockLogger = { - debug: sinon.stub(), - info: sinon.stub(), - error: sinon.stub() - }; - mockMetricLogger = { - metric: sinon.stub() - }; - mockWorkerPool = { - pool: sinon.stub().returns({ - exec: sinon.stub(), - stats: sinon.stub().returns({ - totalWorkers: 1, - busyWorkers: 0, - idleWorkers: 1, - activeTasks: 0 - }), - terminate: sinon.stub() - }) - }; - mockEventEmitter = { - emit: sinon.stub() - }; - - jobQueueManager = new JobQueueManager({ - JobModel: mockJobModel, - config: mockConfig, - logger: mockLogger, - metricLogger: mockMetricLogger, - WorkerPool: mockWorkerPool, - eventEmitter: mockEventEmitter - }); - }); - - afterEach(function () { - sinon.restore(); - }); - - describe('initialization', function () { - it('should initialize with provided dependencies', function () { - expect(jobQueueManager.jobsRepository).to.exist; - expect(jobQueueManager.config).to.exist; - expect(jobQueueManager.logger).to.exist; - expect(jobQueueManager.pool).to.exist; - }); - }); - - describe('init', function () { - it('should start the job queue manager', async function () { - const startQueueProcessorStub = sinon.stub(jobQueueManager, 'startQueueProcessor'); - const reportStatsStub = sinon.stub(jobQueueManager, 'reportStats'); - - await jobQueueManager.init(); - - expect(startQueueProcessorStub.calledOnce).to.be.true; - expect(reportStatsStub.called).to.be.false; - - // Test with reportStats enabled - jobQueueManager.config.reportStats = true; - await jobQueueManager.init(); - expect(reportStatsStub.calledOnce).to.be.true; - }); - - it('should call reportStats when config.reportStats is true', async function () { - const startQueueProcessorStub = sinon.stub(jobQueueManager, 'startQueueProcessor'); - const reportStatsStub = sinon.stub(jobQueueManager, 'reportStats'); - jobQueueManager.config.reportStats = true; - - await jobQueueManager.init(); - - expect(startQueueProcessorStub.calledOnce).to.be.true; - expect(reportStatsStub.calledOnce).to.be.true; - }); - }); - - describe('shutdown', function () { - it('should handle errors during shutdown', async function () { - const error = new Error('Termination error'); - jobQueueManager.pool.terminate.rejects(error); - const loggerErrorStub = sinon.stub(jobQueueManager.logger, 'error'); - - await jobQueueManager.shutdown(); - - expect(jobQueueManager.pool.terminate.calledOnce).to.be.true; - expect(loggerErrorStub.calledWith('Error terminating worker pool:', error)).to.be.true; - }); - - it('should stop the job queue manager without errors', async function () { - jobQueueManager.pool.terminate.resolves(); - const loggerErrorStub = sinon.stub(jobQueueManager.logger, 'error'); - - await jobQueueManager.shutdown(); - - expect(jobQueueManager.pool.terminate.calledOnce).to.be.true; - expect(loggerErrorStub.called).to.be.false; - }); - }); - - describe('addJob', function () { - it('should add a new job', async function () { - const mockJob = {name: 'testJob', metadata: {}}; - const addQueuedJobStub = sinon.stub(jobQueueManager.jobsRepository, 'addQueuedJob').resolves('jobId'); - - const result = await jobQueueManager.addJob(mockJob); - - expect(addQueuedJobStub.calledOnceWith(mockJob)).to.be.true; - expect(result).to.equal('jobId'); - }); - }); - - describe('processPendingJobs', function () { - it('should process pending jobs', async function () { - const mockStats = {pendingTasks: 0}; - const mockJobs = [{get: sinon.stub().returns('{}')}]; - - sinon.stub(jobQueueManager, 'getStats').resolves(mockStats); - sinon.stub(jobQueueManager.jobsRepository, 'getQueuedJobs').resolves({data: mockJobs, total: mockJobs.length}); - sinon.stub(jobQueueManager, 'updatePollInterval'); - sinon.stub(jobQueueManager, 'processJobs'); - - await jobQueueManager.processPendingJobs(); - - expect(jobQueueManager.jobsRepository.getQueuedJobs.calledOnce).to.be.true; - expect(jobQueueManager.updatePollInterval.calledOnceWith(mockJobs)).to.be.true; - expect(jobQueueManager.processJobs.calledOnceWith(mockJobs)).to.be.true; - expect(jobQueueManager.metricCache.queueDepth).to.equal(mockJobs.length); - }); - }); - - describe('createLogger', function () { - it('should create a logger with debug level', function () { - const logger = jobQueueManager.createLogger(mockLogger, 'debug'); - logger.debug('Test debug'); - logger.info('Test info'); - logger.error('Test error'); - expect(mockLogger.debug.calledOnce).to.be.true; - expect(mockLogger.info.calledOnce).to.be.true; - expect(mockLogger.error.calledOnce).to.be.true; - }); - - it('should create a logger with info level', function () { - const logger = jobQueueManager.createLogger(mockLogger, 'info'); - logger.debug('Test debug'); - logger.info('Test info'); - logger.error('Test error'); - expect(mockLogger.debug.called).to.be.false; - expect(mockLogger.info.calledOnce).to.be.true; - expect(mockLogger.error.calledOnce).to.be.true; - }); - - it('should create a logger with error level', function () { - const logger = jobQueueManager.createLogger(mockLogger, 'error'); - logger.info('Test info'); - logger.error('Test error'); - expect(mockLogger.info.called).to.be.false; - expect(mockLogger.error.calledOnce).to.be.true; - }); - }); - - describe('initializeConfig', function () { - it('should initialize config with default values', function () { - const config = jobQueueManager.initializeConfig({}); - expect(config.MIN_POLL_INTERVAL).to.equal(1000); - expect(config.MAX_POLL_INTERVAL).to.equal(60000); - expect(config.QUEUE_CAPACITY).to.equal(500); - expect(config.FETCH_COUNT).to.equal(500); - }); - - it('should use provided values in config', function () { - const config = jobQueueManager.initializeConfig({ - pollMinInterval: 2000, - pollMaxInterval: 120000, - queueCapacity: 1000, - fetchCount: 100 - }); - expect(config.MIN_POLL_INTERVAL).to.equal(2000); - expect(config.MAX_POLL_INTERVAL).to.equal(120000); - expect(config.QUEUE_CAPACITY).to.equal(1000); - expect(config.FETCH_COUNT).to.equal(100); - }); - }); - - describe('startQueueProcessor', function () { - it('should start polling for jobs', async function () { - const clock = sinon.useFakeTimers(); - const processPendingJobsStub = sinon.stub(jobQueueManager, 'processPendingJobs').resolves(); - - jobQueueManager.startQueueProcessor(); - - // No need to tick the clock, as polling should start immediately - expect(processPendingJobsStub.calledOnce).to.be.true; - - // Optionally, we can test the next poll - await clock.tickAsync(jobQueueManager.state.currentPollInterval); - expect(processPendingJobsStub.calledTwice).to.be.true; - - clock.restore(); - }); - - it('should handle errors during polling', async function () { - const clock = sinon.useFakeTimers(); - const error = new Error('Test error'); - sinon.stub(jobQueueManager, 'processPendingJobs').rejects(error); - - // Create a stub for the logger.error method - const loggerErrorStub = sinon.stub(); - jobQueueManager.logger.error = loggerErrorStub; - - jobQueueManager.startQueueProcessor(); - - await clock.tickAsync(jobQueueManager.state.currentPollInterval); - expect(loggerErrorStub.calledWith('Error in queue filler:', error)).to.be.true; - - clock.restore(); - }); - }); - - describe('updatePollInterval', function () { - it('should set to MIN_POLL_INTERVAL when jobs are found', function () { - jobQueueManager.state.currentPollInterval = 60000; - jobQueueManager.updatePollInterval([{}]); - expect(jobQueueManager.state.currentPollInterval).to.equal(jobQueueManager.config.MIN_POLL_INTERVAL); - }); - - it('should set to MAX_POLL_INTERVAL when no jobs found for a while', function () { - const clock = sinon.useFakeTimers(); - jobQueueManager.state.lastFoundJobTime = Date.now() - 31000; - jobQueueManager.updatePollInterval([]); - expect(jobQueueManager.state.currentPollInterval).to.equal(jobQueueManager.config.MAX_POLL_INTERVAL); - clock.restore(); - }); - }); - - describe('processJobs', function () { - it('should process new jobs', async function () { - const executeJobStub = sinon.stub(jobQueueManager, 'executeJob').resolves(); - const jobs = [ - { - get: sinon.stub().returns('{"name": "testJob1"}'), - id: '1' - }, - { - get: sinon.stub().returns('{"name": "testJob2"}'), - id: '2' - } - ]; - await jobQueueManager.processJobs(jobs); - expect(executeJobStub.calledTwice).to.be.true; - }); - - it('should skip already queued jobs', async function () { - const executeJobStub = sinon.stub(jobQueueManager, 'executeJob').resolves(); - jobQueueManager.state.queuedJobs.add('testJob1'); - const jobs = [ - { - get: sinon.stub().returns('{"name": "testJob1"}'), - id: '1' - }, - { - get: sinon.stub().returns('{"name": "testJob2"}'), - id: '2' - } - ]; - await jobQueueManager.processJobs(jobs); - expect(executeJobStub.calledOnce).to.be.true; - expect(executeJobStub.calledWith(jobs[1], 'testJob2', {name: 'testJob2'})).to.be.true; - }); - }); - - describe('executeJob', function () { - it('should execute a job successfully', async function () { - const job = {id: '1', get: sinon.stub().returns('{"job": "testJob", "data": {}}')}; - const deleteStub = sinon.stub(jobQueueManager.jobsRepository, 'delete').resolves(); - - await jobQueueManager.executeJob(job, 'testJob', {job: 'testJob', data: {}}); - - expect(jobQueueManager.pool.exec.calledOnce).to.be.true; - expect(deleteStub.calledWith('1')).to.be.true; - expect(jobQueueManager.state.queuedJobs.has('testJob')).to.be.false; - }); - - it('should handle job execution errors', async function () { - const job = {id: '1', get: sinon.stub().returns('{"job": "testJob", "data": {}}')}; - const error = new Error('Test error'); - jobQueueManager.pool.exec.rejects(error); - const handleJobErrorStub = sinon.stub(jobQueueManager, 'handleJobError').resolves(); - - await jobQueueManager.executeJob(job, 'testJob', {job: 'testJob', data: {}}); - - expect(handleJobErrorStub.calledWith(job, {job: 'testJob', data: {}}, error)).to.be.true; - expect(jobQueueManager.state.queuedJobs.has('testJob')).to.be.false; - }); - - it('should increment the metricCache.jobCompletionCount metric', async function () { - const job = {id: '1', get: sinon.stub().returns('{"job": "testJob", "data": {}}')}; - sinon.stub(jobQueueManager.jobsRepository, 'delete').resolves(); - await jobQueueManager.executeJob(job, 'testJob', {job: 'testJob', data: {}}); - expect(jobQueueManager.metricCache.jobCompletionCount).to.equal(1); - }); - - it('should increment the metricCache.emailAnalyticsAggregateMemberStatsCount metric', async function () { - const job = {id: '1', get: sinon.stub().returns('{"job": "update-member-email-analytics", "data": {}}')}; - sinon.stub(jobQueueManager.jobsRepository, 'delete').resolves(); - await jobQueueManager.executeJob(job, 'update-member-email-analytics', {job: 'update-member-email-analytics', data: {}}); - expect(jobQueueManager.metricCache.emailAnalyticsAggregateMemberStatsCount).to.equal(1); - }); - - it('should emit events if present in result', async function () { - const job = {id: '1', get: sinon.stub().returns('{"job": "testJob", "data": {}}')}; - jobQueueManager.pool.exec.resolves({eventData: {events: [{name: 'member.edited', data: {id: '1'}}]}}); - sinon.stub(jobQueueManager.jobsRepository, 'delete').resolves(); - await jobQueueManager.executeJob(job, 'testJob', {job: 'testJob', data: {}}); - expect(mockEventEmitter.emit.calledOnce).to.be.true; - expect(mockEventEmitter.emit.calledWith('member.edited', {id: '1'})).to.be.true; - }); - }); - - describe('emitEvents', function () { - it('should emit events', function () { - jobQueueManager.emitEvents([{name: 'member.edited', data: {id: '1'}}]); - expect(mockEventEmitter.emit.calledOnce).to.be.true; - expect(mockEventEmitter.emit.calledWith('member.edited', {id: '1'})).to.be.true; - }); - - it('should handle multiple events', function () { - jobQueueManager.emitEvents([{name: 'member.edited', data: {id: '1'}}, {name: 'site.changed', data: {}}]); - expect(mockEventEmitter.emit.calledTwice).to.be.true; - expect(mockEventEmitter.emit.calledWith('member.edited', {id: '1'})).to.be.true; - expect(mockEventEmitter.emit.calledWith('site.changed', {})).to.be.true; - }); - }); - - describe('handleJobError', function () { - it('should handle Error object', async function () { - const job = {id: '1'}; - const jobMetadata = {retries: 0}; - - // Ensure jobsRepository is properly initialized - jobQueueManager.jobsRepository = jobQueueManager.jobsRepository || {}; - - // Create the stub on the jobsRepository - const updateStub = sinon.stub(jobQueueManager.jobsRepository, 'update').resolves(); - - const error = new Error('Test error'); - - await jobQueueManager.handleJobError(job, jobMetadata, error); - - expect(updateStub.called, 'update should be called').to.be.true; - expect(updateStub.calledOnce, 'update should be called once').to.be.true; - - const [calledId, calledUpdateData] = updateStub.args[0]; - - expect(calledId).to.equal('1'); - expect(calledUpdateData).to.deep.include({ - status: 'failed', - metadata: JSON.stringify({ - retries: 1, - error: 'Test error' - }) - }); - expect(calledUpdateData.finished_at).to.be.instanceOf(Date); - }); - }); - - describe('reportStats', function () { - it('should log the stats every reportInterval', async function () { - const clock = sinon.useFakeTimers(); - const reportStatsStub = sinon.stub(jobQueueManager, '_doReportStats'); - jobQueueManager.config.reportInterval = 1000; - jobQueueManager.reportStats(); - await clock.tickAsync(1000); - expect(reportStatsStub.calledOnce).to.be.true; - clock.restore(); - }); - - it('should not log the stats if reportStats is false', async function () { - const clock = sinon.useFakeTimers(); - jobQueueManager.config.reportStats = false; - const reportStatsStub = sinon.stub(jobQueueManager, '_doReportStats'); - jobQueueManager.reportStats(); - await clock.tickAsync(2000); - expect(reportStatsStub.called).to.be.false; - clock.restore(); - }); - }); - - describe('_doReportStats', function () { - it('should log the stats using the logger', function () { - const loggerInfoStub = sinon.stub(jobQueueManager.logger, 'info'); - jobQueueManager._doReportStats(); - const expectedStats = { - totalWorkers: 1, - busyWorkers: 0, - idleWorkers: 1, - activeTasks: 0, - jobCompletionCount: 0, - queueDepth: 0, - emailAnalyticsAggregateMemberStatsCount: 0 - }; - const expectedLog = `Job Queue Stats: ${JSON.stringify(expectedStats, null, 2)}`; - expect(loggerInfoStub.calledOnce).to.be.true; - expect(loggerInfoStub.calledWith(expectedLog)).to.be.true; - }); - - it('should log the stats using the metricLogger', function () { - jobQueueManager._doReportStats(); - const expectedStats = { - totalWorkers: 1, - busyWorkers: 0, - idleWorkers: 1, - activeTasks: 0, - jobCompletionCount: 0, - queueDepth: 0, - emailAnalyticsAggregateMemberStatsCount: 0 - }; - expect(mockMetricLogger.metric.calledOnce).to.be.true; - const args = mockMetricLogger.metric.args[0]; - expect(args[0]).to.equal('job_manager_queue'); - expect(args[1]).to.deep.equal(expectedStats); - }); - }); -}); \ No newline at end of file