Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 43 additions & 29 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ Cronut supports **in-memory** scheduling of jobs within a single JVM. JDBC and d
* [Jobs](#jobs)
+ [Job example](#job-example)
* [Triggers](#triggers)
- [`cronut.trigger/cron`: Simple Cron Scheduling](#cronuttriggercron-simple-cron-scheduling)
- [`cronut.trigger/interval`: Simple Interval Scheduling](#cronuttriggerinterval-simple-interval-scheduling)
- [`cronut.trigger/builder`: Full trigger definition](#cronuttriggerbuilder-full-trigger-definition)
- [`cronut.trigger/cron`: Simple Cron Scheduling](#cronuttriggercron-simple-cron-scheduling)
- [`cronut.trigger/interval`: Simple Interval Scheduling](#cronuttriggerinterval-simple-interval-scheduling)
- [`cronut.trigger/builder`: Full trigger definition](#cronuttriggerbuilder-full-trigger-definition)
* [Concurrent execution](#concurrent-execution)
+ [Global concurrent execution](#global-concurrent-execution)
+ [Job-specific concurrent execution](#job-specific-concurrent-execution)
Expand All @@ -47,12 +47,12 @@ Cronut provides access to the Quartz Scheduler, exposed via the `cronut/schedule

Create a scheduler with the following configuration:

1. `:concurrent-execution-disallowed?`: (optional, default false) - run all jobs with @DisableConcurrentExecution
2. `:update-check?`: (optional, default false) - check for Quartz updates on system startup.
1. `:concurrent-execution-disallowed?`: run all jobs with `@DisableConcurrentExecution`
2. `:update-check?`: check for Quartz updates on system startup.

````clojure
(cronut/scheduler {:concurrent-execution-disallowed? true
:update-check? false})
(cronut/scheduler {:concurrent-execution-disallowed? true ;; default false
:update-check? false}) ;; default false
````

### Scheduler lifecycle
Expand Down Expand Up @@ -84,12 +84,8 @@ To schedule jobs, you can

Each cronut job must implement the `org.quartz.Job` interface.

The expectation being that every job will reify that interface either directly via `reify` or by returning a `defrecord`
that implements the interface.

Cronut supports further Quartz configuration of jobs (identity, description, recovery, and durability) by expecting
those values to be assoc'd onto your job. You do not have to set them (in fact in most cases you can likely ignore
them), however if you do want that control you will likely use the `defrecord` approach as opposed to `reify`.
Cronut supports further Quartz configuration of jobs (identity, description, recovery, and durability) by passing an
`opts` map when scheduling your job.

Concurrent execution can be controlled on a per-job bases with the `disallow-concurrent-execution?` flag.

Expand All @@ -104,22 +100,31 @@ Concurrent execution can be controlled on a per-job bases with the `disallow-con

(let [scheduler (cronut/scheduler {:concurrent-execution-disallowed? true
:update-check? false})
defrecord-job (map->TestDefrecordJobImpl {:identity ["name1" "group2"]
:description "test job"
:recover? true
:durable? false})
defrecord-job (map->TestDefrecordJobImpl {})
reify-job (reify Job
(execute [_this _job-context]
(let [rand-id (str (UUID/randomUUID))]
(log/info rand-id "Reified Impl"))))]

(cronut/schedule-job scheduler (trigger/interval 1000) defrecord-job)
(cronut/schedule-job scheduler
(trigger/interval 1000)
defrecord-job
{:name "name1"
:group "group1"
:description "test job 1"
:recover? true
:durable? false})

(cronut/schedule-job scheduler
(trigger/builder {:type :cron
:cron "*/5 * * * * ?"
:misfire :do-nothing})
reify-job))
reify-job
{:name "name2"
:group "group1"
:description "test job 2"
:recover? false
:durable? true}))
````

## Triggers
Expand Down Expand Up @@ -153,20 +158,22 @@ The `cronut.trigger/builder` function supports the full set of Quartz configurat

````clojure
;; interval
(cronut.trigger/builder {:type :simple
(cronut.trigger/builder {:name "trigger1"
:group "group3"
:type :simple
:interval 3000
:repeat :forever
:identity ["trigger-two" "test"]
:description "sample simple trigger"
:start #inst "2019-01-01T00:00:00.000-00:00"
:end #inst "2019-02-01T00:00:00.000-00:00"
:misfire :ignore
:priority 5})

;;cron
(cronut.trigger/builder {:type :cron
(cronut.trigger/builder {:name "trigger2"
:group "group3"
:type :cron
:cron "*/6 * * * * ?"
:identity ["trigger-five" "test"]
:description "sample cron trigger"
:start #inst "2018-01-01T00:00:00.000-00:00"
:end #inst "2029-02-01T00:00:00.000-00:00"
Expand Down Expand Up @@ -207,7 +214,7 @@ See: integration test source: [test/cronut/integration-test.clj](test/cronut/int
(:import (java.util UUID)
(org.quartz Job)))

(defrecord TestDefrecordJobImpl [identity description recover? durable? test-dep disallowConcurrentExecution?]
(defrecord TestDefrecordJobImpl []
Job
(execute [this _job-context]
(log/info "Defrecord Impl:" this)))
Expand All @@ -231,10 +238,12 @@ See: integration test source: [test/cronut/integration-test.clj](test/cronut/int
(log/info "scheduling defrecord job on 1s interval")
(cronut/schedule-job scheduler
(trigger/interval 1000)
(map->TestDefrecordJobImpl {:identity ["name1" "group2"]
:description "test job"
:recover? true
:durable? false}))
(map->TestDefrecordJobImpl {})
{:name "name1"
:group "group2"
:description "test job"
:recover? true
:durable? false})

;; demonstrate scheduler can start with jobs, and jobs can start after scheduler
(cronut/start scheduler)
Expand All @@ -247,7 +256,12 @@ See: integration test source: [test/cronut/integration-test.clj](test/cronut/int
(trigger/builder {:type :cron
:cron "*/5 * * * * ?"
:misfire :do-nothing})
reify-job)
reify-job
{:name "name2"
:group "group2"
:description "test job 2"
:recover? false
:durable? true})

(async/<!! (async/timeout 15000))

Expand Down
44 changes: 21 additions & 23 deletions src/cronut.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
(:require [clojure.tools.logging :as log]
[cronut.job :as job]
[cronut.trigger :as trigger])
(:import (org.quartz JobDetail JobKey Scheduler Trigger TriggerBuilder)
(:import (org.quartz JobDetail Scheduler Trigger TriggerBuilder)
(org.quartz.impl StdSchedulerFactory)))

(defn scheduler
"Create a new Quartz scheduler:
:concurrent-execution-disallowed? - run all jobs with @DisableConcurrentExecution (default false)
:update-check? - check for Quartz updates on system startup (default: false)"
[{:keys [update-check? concurrent-execution-disallowed?]}]
^Scheduler [{:keys [update-check? concurrent-execution-disallowed?]}]
(log/infof "initializing scheduler")
(when-not update-check?
(System/setProperty "org.terracotta.quartz.skipUpdateCheck" "true")
Expand All @@ -29,31 +29,29 @@
[^Scheduler scheduler]
(= "true" (get (.getContext scheduler) "concurrentExecutionDisallowed?")))

(defn get-detail
"Get the job detail for a key"
[^Scheduler scheduler ^JobKey key]
(.getJobDetail scheduler key))

(defn schedule-job
^Trigger [^Scheduler scheduler ^TriggerBuilder trigger job]
(let [detail ^JobDetail (job/detail job (concurrent-execution-disallowed? scheduler))]
(if-let [^JobDetail previously-scheduled (get-detail scheduler (.getKey detail))]
(let [built (.build (.forJob trigger previously-scheduled))]
(log/info "scheduling new trigger for existing job" built previously-scheduled)
(.scheduleJob scheduler built)
built)
(let [built (.build trigger)]
(log/info "scheduling new job" built detail)
(.scheduleJob scheduler detail built)
built))))
(^Trigger [^Scheduler scheduler ^TriggerBuilder trigger job]
(schedule-job scheduler trigger job nil))
(^Trigger [^Scheduler scheduler ^TriggerBuilder trigger job opts]
(let [cce-disallowed? (concurrent-execution-disallowed? scheduler)
detail (job/detail job (update opts :disallow-concurrent-execution? #(or cce-disallowed? %1)))]
(if-let [^JobDetail previously-scheduled (.getJobDetail scheduler (.getKey detail))]
(let [built (.build (.forJob trigger previously-scheduled))]
(log/info "scheduling new trigger for existing job" (str (.getKey previously-scheduled)) opts)
(.scheduleJob scheduler built)
built)
(let [built (.build trigger)]
(log/info "scheduling new job" (str (.getKey detail)) opts)
(.scheduleJob scheduler detail built)
built)))))

(defn schedule-jobs
[^Scheduler scheduler jobs]
(log/infof "scheduling [%s] jobs" (count jobs))
(loop [schedule jobs
[^Scheduler scheduler schedule]
(log/infof "scheduling [%s] jobs" (count schedule))
(loop [schedule schedule
triggers []]
(if-let [{:keys [^TriggerBuilder trigger job]} (first schedule)]
(recur (rest schedule) (conj triggers (schedule-job scheduler trigger job)))
(if-let [{:keys [^TriggerBuilder trigger job opts]} (first schedule)]
(recur (rest schedule) (conj triggers (schedule-job scheduler trigger job opts)))
triggers)))

(defn pause-job
Expand Down
12 changes: 5 additions & 7 deletions src/cronut/job.clj
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
(ns cronut.job
(:refer-clojure :exclude [key])
(:import (org.quartz DisallowConcurrentExecution Job JobBuilder JobDataMap JobExecutionException JobKey)
(:import (org.quartz DisallowConcurrentExecution Job JobBuilder JobDataMap JobDetail JobExecutionException JobKey)
(org.quartz.spi JobFactory TriggerFiredBundle)))

(defrecord ^{DisallowConcurrentExecution true} SerialProxyJob [proxied-job]
Expand Down Expand Up @@ -36,13 +36,11 @@
(->ProxyJob job))))))

(defn detail
[job concurrent-execution-disallowed?]
(let [{:keys [identity description recover? durable? disallow-concurrent-execution?]} job]
(.build (cond-> (-> (JobBuilder/newJob (if (or concurrent-execution-disallowed? ;; global concurrency disallowed flag
disallow-concurrent-execution?) ;; job specific concurrency disallowed flag
SerialProxyJob ProxyJob))
^JobDetail [job opts]
(let [{:keys [name group description recover? durable? disallow-concurrent-execution?]} opts]
(.build (cond-> (-> (JobBuilder/newJob (if disallow-concurrent-execution? SerialProxyJob ProxyJob))
(.setJobData (JobDataMap. {"job-impl" job})))
(seq identity) (.withIdentity (first identity) (second identity))
name (.withIdentity name group)
description (.withDescription description)
(boolean? recover?) (.requestRecovery recover?)
(boolean? durable?) (.storeDurably durable?)))))
Expand Down
4 changes: 2 additions & 2 deletions src/cronut/trigger.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@

(defn base-builder
"Provide a base trigger-builder from configuration"
[{:keys [identity description start end priority]}]
[{:keys [name group description start end priority]}]
(cond-> (TriggerBuilder/newTrigger)
(seq identity) (.withIdentity (first identity) (second identity))
name (.withIdentity name group)
description (.withDescription description)
start (.startAt start)
(nil? start) (.startNow)
Expand Down
19 changes: 13 additions & 6 deletions test/cronut/integration_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
(:import (java.util UUID)
(org.quartz Job)))

(defrecord TestDefrecordJobImpl [identity description recover? durable? test-dep disallowConcurrentExecution?]
(defrecord TestDefrecordJobImpl []
Job
(execute [this _job-context]
(log/info "Defrecord Impl:" this)))
Expand All @@ -30,10 +30,12 @@
(log/info "scheduling defrecord job on 1s interval")
(cronut/schedule-job scheduler
(trigger/interval 1000)
(map->TestDefrecordJobImpl {:identity ["name1" "group2"]
:description "test job"
:recover? true
:durable? false}))
(map->TestDefrecordJobImpl {})
{:name "name1"
:group "group2"
:description "test job"
:recover? true
:durable? false})

;; demonstrate scheduler can start with jobs, and jobs can start after scheduler
(cronut/start scheduler)
Expand All @@ -46,7 +48,12 @@
(trigger/builder {:type :cron
:cron "*/5 * * * * ?"
:misfire :do-nothing})
reify-job)
reify-job
{:name "name2"
:group "group2"
:description "test job 2"
:recover? false
:durable? true})

(async/<!! (async/timeout 15000))

Expand Down
Loading