diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java index 60a4bceaf99..27a481f7fa5 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java @@ -153,25 +153,25 @@ public String getNextScheduleTime() throws Exception { @Override public void removeAllJobs() throws Exception { - this.jobScheduler.removeAllJobs(); + this.jobScheduler.removeAllJobs(null); } @Override public void removeAllJobs(String startTime, String finishTime) throws Exception { long start = JobSupport.getDataTime(startTime); long finish = JobSupport.getDataTime(finishTime); - this.jobScheduler.removeAllJobs(start, finish); + this.jobScheduler.removeAllJobs(start, finish, null); } @Override public void removeAllJobsAtScheduledTime(String time) throws Exception { long removeAtTime = JobSupport.getDataTime(time); - this.jobScheduler.remove(removeAtTime); + this.jobScheduler.remove(removeAtTime, null); } @Override public void removeJob(String jobId) throws Exception { - this.jobScheduler.remove(jobId); + this.jobScheduler.remove(jobId, null); } @Override diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java index e95186175d1..b4d3b99170f 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java @@ -18,6 +18,7 @@ import java.util.List; +import org.apache.activemq.command.Message; import org.apache.activemq.util.ByteSequence; public interface JobScheduler { @@ -118,27 +119,34 @@ public interface JobScheduler { * * @param time * The UTC time to use to remove a batch of scheduled Jobs. + * @param message + * The message from management queue, that triggered the removal action * * @throws Exception */ - void remove(long time) throws Exception; + void remove(long time, Message message) throws Exception; /** * remove a job with the matching jobId * * @param jobId * The unique Job Id to search for and remove from the scheduled set of jobs. + * @param message + * The message from management queue, that triggered the removal action * * @throws Exception if an error occurs while removing the Job. */ - void remove(String jobId) throws Exception; + void remove(String jobId, Message message) throws Exception; /** * remove all the Jobs from the scheduler * + * @param message + * The message from management queue, that triggered the removal action + * * @throws Exception */ - void removeAllJobs() throws Exception; + void removeAllJobs(Message message) throws Exception; /** * remove all the Jobs from the scheduler that are due between the start and finish times @@ -147,9 +155,11 @@ public interface JobScheduler { * time in milliseconds * @param finish * time in milliseconds + * @param message + * The message from management queue, that triggered the removal action * @throws Exception */ - void removeAllJobs(long start, long finish) throws Exception; + void removeAllJobs(long start, long finish, Message message) throws Exception; /** * Get the next time jobs will be fired diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java index 24a216a7ee9..3180e29df58 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java @@ -19,6 +19,7 @@ import java.util.Collections; import java.util.List; +import org.apache.activemq.command.Message; import org.apache.activemq.util.ByteSequence; /** @@ -89,34 +90,34 @@ public long getNextScheduleTime() throws Exception { } @Override - public void remove(long time) throws Exception { + public void remove(long time, Message message) throws Exception { JobScheduler js = this.broker.getInternalScheduler(); if (js != null) { - js.remove(time); + js.remove(time, message); } } @Override - public void remove(String jobId) throws Exception { + public void remove(String jobId, Message message) throws Exception { JobScheduler js = this.broker.getInternalScheduler(); if (js != null) { - js.remove(jobId); + js.remove(jobId, message); } } @Override - public void removeAllJobs() throws Exception { + public void removeAllJobs(Message message) throws Exception { JobScheduler js = this.broker.getInternalScheduler(); if (js != null) { - js.removeAllJobs(); + js.removeAllJobs(message); } } @Override - public void removeAllJobs(long start, long finish) throws Exception { + public void removeAllJobs(long start, long finish, Message message) throws Exception { JobScheduler js = this.broker.getInternalScheduler(); if (js != null) { - js.removeAllJobs(start, finish); + js.removeAllJobs(start, finish, message); } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java index 3a778c5ad1c..dc7906a7631 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java @@ -272,7 +272,7 @@ public void send(ProducerBrokerExchange producerExchange, final Message messageS } } if (jobId != null && action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVE)) { - scheduler.remove(jobId); + scheduler.remove(jobId, messageSend); } else if (action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL)) { if (startTime != null && endTime != null) { @@ -280,9 +280,9 @@ public void send(ProducerBrokerExchange producerExchange, final Message messageS long start = (Long) TypeConversionSupport.convert(startTime, Long.class); long finish = (Long) TypeConversionSupport.convert(endTime, Long.class); - scheduler.removeAllJobs(start, finish); + scheduler.removeAllJobs(start, finish, messageSend); } else { - scheduler.removeAllJobs(); + scheduler.removeAllJobs(messageSend); } } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobScheduler.java b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobScheduler.java index 92a08ff8c9f..a5af459a96f 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobScheduler.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobScheduler.java @@ -38,6 +38,7 @@ import org.apache.activemq.broker.scheduler.JobListener; import org.apache.activemq.broker.scheduler.JobScheduler; import org.apache.activemq.broker.scheduler.JobSupport; +import org.apache.activemq.command.Message; import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.IdGenerator; import org.slf4j.Logger; @@ -129,22 +130,22 @@ public void schedule(String jobId, ByteSequence payload, String cronEntry, long } @Override - public void remove(long time) throws Exception { + public void remove(long time, Message message) throws Exception { doRemoveRange(time, time); } @Override - public void remove(String jobId) throws Exception { + public void remove(String jobId, Message message) throws Exception { doRemoveJob(jobId); } @Override - public void removeAllJobs() throws Exception { + public void removeAllJobs(Message message) throws Exception { doRemoveRange(0, Long.MAX_VALUE); } @Override - public void removeAllJobs(long start, long finish) throws Exception { + public void removeAllJobs(long start, long finish, Message message) throws Exception { doRemoveRange(start, finish); } diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java index 29e9e23ee83..b8422bce6f2 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java @@ -33,6 +33,7 @@ import org.apache.activemq.broker.scheduler.Job; import org.apache.activemq.broker.scheduler.JobListener; import org.apache.activemq.broker.scheduler.JobScheduler; +import org.apache.activemq.command.Message; import org.apache.activemq.protobuf.Buffer; import org.apache.activemq.store.kahadb.data.KahaAddScheduledJobCommand; import org.apache.activemq.store.kahadb.data.KahaRemoveScheduledJobCommand; @@ -101,22 +102,22 @@ public void schedule(final String jobId, final ByteSequence payload, final Strin } @Override - public void remove(final long time) throws IOException { + public void remove(final long time, Message message) throws IOException { doRemoveRange(time, time); } @Override - public void remove(final String jobId) throws IOException { + public void remove(final String jobId, Message message) throws IOException { doRemove(-1, jobId); } @Override - public void removeAllJobs() throws IOException { + public void removeAllJobs(Message message) throws IOException { doRemoveRange(0, Long.MAX_VALUE); } @Override - public void removeAllJobs(final long start, final long finish) throws IOException { + public void removeAllJobs(final long start, final long finish, Message message) throws IOException { doRemoveRange(start, finish); } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java index b84a782c4b5..2356f41be70 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java @@ -144,12 +144,12 @@ public void testRemoveLong() throws Exception { assertEquals(size, COUNT); long removeTime = scheduler.getNextScheduleTime(); - scheduler.remove(removeTime); + scheduler.remove(removeTime, null); // If all jobs are not started within the same second we need to call remove again if (size != 0) { removeTime = scheduler.getNextScheduleTime(); - scheduler.remove(removeTime); + scheduler.remove(removeTime, null); } size = scheduler.getAllJobs().size(); @@ -172,7 +172,7 @@ public void testRemoveString() throws Exception { int size = scheduler.getAllJobs().size(); assertEquals(size, COUNT + 1); - scheduler.remove(test); + scheduler.remove(test, null); size = scheduler.getAllJobs().size(); assertEquals(size, COUNT); } @@ -264,7 +264,7 @@ public void testRemoveAllJobsInRange() throws Exception { } start = System.currentTimeMillis(); long finish = start + 12000 + (COUNT * 1000); - scheduler.removeAllJobs(start, finish); + scheduler.removeAllJobs(start, finish, null); assertTrue(scheduler.getAllJobs().isEmpty()); } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerTest.java index 9b86693054b..b10c005d777 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerTest.java @@ -148,12 +148,12 @@ public void testRemoveLong() throws Exception { assertEquals(size, COUNT); long removeTime = scheduler.getNextScheduleTime(); - scheduler.remove(removeTime); + scheduler.remove(removeTime, null); // If all jobs are not started within the same second we need to call remove again if (size != 0) { removeTime = scheduler.getNextScheduleTime(); - scheduler.remove(removeTime); + scheduler.remove(removeTime, null); } size = scheduler.getAllJobs().size(); @@ -176,7 +176,7 @@ public void testRemoveString() throws Exception { int size = scheduler.getAllJobs().size(); assertEquals(size, COUNT + 1); - scheduler.remove(test); + scheduler.remove(test, null); size = scheduler.getAllJobs().size(); assertEquals(size, COUNT); } @@ -268,7 +268,7 @@ public void testRemoveAllJobsInRange() throws Exception { } start = System.currentTimeMillis(); long finish = start + 12000 + (COUNT * 1000); - scheduler.removeAllJobs(start, finish); + scheduler.removeAllJobs(start, finish, null); assertTrue(scheduler.getAllJobs().isEmpty()); }