diff --git a/xtraplatform-jobs/src/main/java/de/ii/xtraplatform/jobs/app/JobQueueBackendLocal.java b/xtraplatform-jobs/src/main/java/de/ii/xtraplatform/jobs/app/JobQueueBackendLocal.java index 8d13a353..efc564b1 100644 --- a/xtraplatform-jobs/src/main/java/de/ii/xtraplatform/jobs/app/JobQueueBackendLocal.java +++ b/xtraplatform-jobs/src/main/java/de/ii/xtraplatform/jobs/app/JobQueueBackendLocal.java @@ -36,6 +36,7 @@ @Singleton @AutoBind(interfaces = JobQueueBackend.class) +@SuppressWarnings({"PMD.TooManyMethods", "PMD.AvoidSynchronizedAtMethodLevel"}) public class JobQueueBackendLocal extends AbstractJobQueueBackend> implements JobQueueBackend { diff --git a/xtraplatform-jobs/src/main/java/de/ii/xtraplatform/jobs/app/JobRunner.java b/xtraplatform-jobs/src/main/java/de/ii/xtraplatform/jobs/app/JobRunner.java index 3ed155c0..e6b5ca72 100644 --- a/xtraplatform-jobs/src/main/java/de/ii/xtraplatform/jobs/app/JobRunner.java +++ b/xtraplatform-jobs/src/main/java/de/ii/xtraplatform/jobs/app/JobRunner.java @@ -52,6 +52,7 @@ @Singleton @AutoBind(interfaces = {AppLifeCycle.class}) +@SuppressWarnings({"PMD.TooManyMethods", "PMD.GuardLogStatement"}) public class JobRunner extends AbstractVolatileComposed implements AppLifeCycle, VolatileComposed { private static final Logger LOGGER = LoggerFactory.getLogger(JobRunner.class); @@ -83,15 +84,19 @@ public class JobRunner extends AbstractVolatileComposed implements AppLifeCycle, (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool( 1, new ThreadFactoryBuilder().setNameFormat("jobs.poll-%d").build())); + + int jobConcurrency = appContext.getConfiguration().getJobConcurrency(); + @SuppressWarnings( + "PMD.CloseResource") // False positive: executor is properly closed in onStop() ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool( - appContext.getConfiguration().getJobConcurrency(), - new ThreadFactoryBuilder().setNameFormat("jobs.exec-%d").build()); - this.executor = MoreExecutors.getExitingExecutorService(threadPoolExecutor); - this.executorName = appContext.getInstanceName(); + jobConcurrency, new ThreadFactoryBuilder().setNameFormat("jobs.exec-%d").build()); + this.maxThreads = threadPoolExecutor.getMaximumPoolSize(); this.activeThreads = threadPoolExecutor::getActiveCount; + this.executor = MoreExecutors.getExitingExecutorService(threadPoolExecutor); + this.executorName = appContext.getInstanceName(); this.activeJobSets = Collections.synchronizedSet(new LinkedHashSet<>()); this.asyncStartup = appContext.getConfiguration().getModules().isStartupAsync(); @@ -113,6 +118,32 @@ public CompletionStage onStart(boolean isStartupAsync) { return AppLifeCycle.super.onStart(isStartupAsync); } + @Override + public void onStop() { + LOGGER.info("Shutting down job runner..."); + + // Shutdown executors gracefully + polling.shutdown(); + executor.shutdown(); + + try { + // Wait for tasks to complete + if (!executor.awaitTermination(30, TimeUnit.SECONDS)) { + LOGGER.warn("Job executor did not terminate gracefully, forcing shutdown"); + executor.shutdownNow(); + } + if (!polling.awaitTermination(5, TimeUnit.SECONDS)) { + LOGGER.warn("Polling executor did not terminate gracefully, forcing shutdown"); + polling.shutdownNow(); + } + } catch (InterruptedException e) { + LOGGER.error("Interrupted while waiting for executors to terminate", e); + executor.shutdownNow(); + polling.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + @Override protected Tuple volatileInit() { if (asyncStartup) { @@ -124,71 +155,78 @@ protected Tuple volatileInit() { private void init() { jobQueue.onPush(this::checkWork); - // check for orphaned jobs every minute - polling.scheduleAtFixedRate( - () -> { - long oneMinuteAgo = Instant.now().minus(Duration.ofMinutes(1)).getEpochSecond(); - - if (logJobsTrace()) { - LOGGER.trace(MARKER.JOBS, "Checking for orphaned jobs (updatedAt < {})", oneMinuteAgo); - } - - for (Job job : jobQueue.getTaken()) { - // TODO: also update vector progress, remove raster check - if (job.getType().equals("tile-seeding:raster:png") - && job.getUpdatedAt().get() < oneMinuteAgo) { - if (logJobsDebug()) { - LOGGER.debug(MARKER.JOBS, "Found orphaned job, adding to queue again: {}", job); - } - jobQueue.push(job, true); - } - } + // Check for orphaned jobs and cleanup old job sets every minute + polling.scheduleAtFixedRate(this::checkOrphanedJobsAndCleanup, 1, 1, TimeUnit.MINUTES); - if (logJobsTrace()) { - LOGGER.trace(MARKER.JOBS, "Finished checking for orphaned jobs"); - } + // Log progress for active job sets every 5 seconds + polling.scheduleAtFixedRate(this::logActiveJobSetProgress, 5, 5, TimeUnit.SECONDS); + } + + private void checkOrphanedJobsAndCleanup() { + checkOrphanedJobs(); + cleanupOldJobSets(); + } - // remove done job sets older than one hour - long oneHourAgo = Instant.now().minus(Duration.ofHours(1)).getEpochSecond(); + private void checkOrphanedJobs() { + long oneMinuteAgo = Instant.now().minus(Duration.ofMinutes(1)).getEpochSecond(); - for (JobSet jobSet : jobQueue.getSets()) { - if (jobSet.isDone() && jobSet.getUpdatedAt().get() < oneHourAgo) { - jobQueue.doneSet(jobSet.getId()); + if (logJobsTrace()) { + LOGGER.trace(MARKER.JOBS, "Checking for orphaned jobs (updatedAt < {})", oneMinuteAgo); + } + + for (Job job : jobQueue.getTaken()) { + // TODO: also update vector progress, remove raster check + if ("tile-seeding:raster:png".equals(job.getType()) + && job.getUpdatedAt().get() < oneMinuteAgo) { + if (logJobsDebug()) { + LOGGER.debug(MARKER.JOBS, "Found orphaned job, adding to queue again: {}", job); + } + jobQueue.push(job, true); + } + } + + if (logJobsTrace()) { + LOGGER.trace(MARKER.JOBS, "Finished checking for orphaned jobs"); + } + } + + private void cleanupOldJobSets() { + long oneHourAgo = Instant.now().minus(Duration.ofHours(1)).getEpochSecond(); + + for (JobSet jobSet : jobQueue.getSets()) { + if (jobSet.isDone() && jobSet.getUpdatedAt().get() < oneHourAgo) { + jobQueue.doneSet(jobSet.getId()); + } + } + } + + private void logActiveJobSetProgress() { + if (logJobsDebug()) { + activeJobSets.forEach( + (jobSetId) -> { + JobSet jobSet = jobQueue.getSet(jobSetId); + if (jobSet == null) { + return; } - } - }, - 1, - 1, - TimeUnit.MINUTES); - - // log progress for active job sets every 5 seconds - polling.scheduleAtFixedRate( - () -> { - if (logJobsDebug()) { - activeJobSets.forEach( - (jobSetId) -> { - JobSet jobSet = jobQueue.getSet(jobSetId); - if (Objects.nonNull(jobSet)) { - if (jobSet.getEntity().isPresent()) { - LogContext.put(LogContext.CONTEXT.SERVICE, jobSet.getEntity().get()); - } - LOGGER.debug( - MARKER.JOBS, - "{} at {}%{}", - jobSet.getLabel(), - jobSet.getPercent(), - jobSet.getDescription().orElse("")); - } - }); - } - if (logJobsTrace()) { - LOGGER.trace( - MARKER.JOBS, "Job processor threads busy: {}/{}", activeThreads.get(), maxThreads); - } - }, - 5, - 5, - TimeUnit.SECONDS); + + if (jobSet.getEntity().isPresent()) { + LogContext.put(LogContext.CONTEXT.SERVICE, jobSet.getEntity().get()); + } + + if (logJobsDebug()) { + LOGGER.debug( + MARKER.JOBS, + "{} at {}%{}", + jobSet.getLabel(), + jobSet.getPercent(), + jobSet.getDescription().orElse("")); + } + }); + } + if (logJobsTrace()) { + LOGGER.trace( + MARKER.JOBS, "Job processor threads busy: {}/{}", activeThreads.get(), maxThreads); + } } private Optional> getJobType(String jobType) { @@ -204,122 +242,155 @@ private void checkWork(String jobType) { if (logJobsTrace()) { LOGGER.trace(MARKER.JOBS, "CHECK WORK {}", jobType); } - List> orderedProcessors = - processors.get().stream() - .filter( - p -> - Objects.equals(jobType, JOB_TYPE_WILDCARD) - || Objects.equals(jobType, p.getJobType())) - .sorted( - Comparator.>comparingInt(JobProcessor::getPriority).reversed()) - .toList(); + List> orderedProcessors = getOrderedProcessors(jobType); while (activeJobs.get() < maxThreads) { - boolean hasWork = false; - - for (JobProcessor processor : orderedProcessors) { - Optional optionalJob = jobQueue.take(processor.getJobType(), executorName); - - if (optionalJob.isPresent()) { - hasWork = true; - if (logJobsTrace()) { - LOGGER.trace( - MARKER.JOBS, - "Assigned job to processor {}: {} ({})", - processor.getJobType(), - optionalJob.get().getId(), - optionalJob.get().getPriority()); - } - if (optionalJob.get().getPartOf().isPresent()) { - JobSet jobSet = jobQueue.getSet(optionalJob.get().getPartOf().get()); - - if (jobSet.getEntity().isPresent()) { - LogContext.put(LogContext.CONTEXT.SERVICE, jobSet.getEntity().get()); - } + if (!tryAssignWork(orderedProcessors)) { + break; + } + } + } - activeJobSets.add(jobSet.getId()); - } + private List> getOrderedProcessors(String jobType) { + return processors.get().stream() + .filter( + p -> + Objects.equals(jobType, JOB_TYPE_WILDCARD) + || Objects.equals(jobType, p.getJobType())) + .sorted(Comparator.>comparingInt(JobProcessor::getPriority).reversed()) + .toList(); + } - try { - run(processor, optionalJob.get()); - } catch (Throwable e) { - LogContext.error(LOGGER, e, "Error scheduling job"); - } + private boolean tryAssignWork(List> orderedProcessors) { + for (JobProcessor processor : orderedProcessors) { + Optional optionalJob = jobQueue.take(processor.getJobType(), executorName); - break; - } + if (optionalJob.isPresent()) { + assignJobToProcessor(processor, optionalJob.get()); + return true; } - if (!hasWork) { - break; + } + return false; + } + + private void assignJobToProcessor(JobProcessor processor, Job job) { + if (logJobsTrace()) { + LOGGER.trace( + MARKER.JOBS, + "Assigned job to processor {}: {} ({})", + processor.getJobType(), + job.getId(), + job.getPriority()); + } + + setupJobContext(job); + + try { + run(processor, job); + } catch (Throwable e) { + LogContext.error(LOGGER, e, "Error scheduling job"); + } + } + + private void setupJobContext(Job job) { + if (job.getPartOf().isPresent()) { + JobSet jobSet = jobQueue.getSet(job.getPartOf().get()); + + if (jobSet.getEntity().isPresent()) { + LogContext.put(LogContext.CONTEXT.SERVICE, jobSet.getEntity().get()); } + + activeJobSets.add(jobSet.getId()); } } private void run(JobProcessor processor, Job job) { activeJobs.incrementAndGet(); - executor.execute( - () -> { - Instant start = Instant.now(); - Optional jobSet = job.getPartOf().map(jobQueue::getSet); - - if (jobSet.isPresent() && jobSet.get().getEntity().isPresent()) { - LogContext.put(LogContext.CONTEXT.SERVICE, jobSet.get().getEntity().get()); - } - - if (logJobsTrace()) { - LOGGER.trace(MARKER.JOBS, "Processing job: {}", job); - } - - if (jobSet.isPresent() && !jobSet.get().isStarted()) { - if (jobSet.get().getSetup().isEmpty() - || !Objects.equals(job.getId(), jobSet.get().getSetup().get().getId())) { - jobQueue.startJobSet(jobSet.get()); - - if (LOGGER.isInfoEnabled() || LOGGER.isInfoEnabled(MARKER.JOBS)) { - LOGGER.info( - MARKER.JOBS, - "{} started ({})", - jobSet.get().getLabel(), - jobQueue.getJobSetDetails(JobSetDetails.class, jobSet.get()).getLabel()); - } - } - } - - JobResult result; - try { - result = processor.process(job, jobSet.orElse(null), jobQueue); - } catch (Throwable e) { - result = JobResult.error(e.getClass() + e.getMessage()); - LogContext.errorAsDebug(LOGGER, e, "Error processing job {}", job.getId()); - } - - if (result.isSuccess()) { - jobQueue.done(job.getId()); - } else if (result.isFailure()) { - boolean retry = jobQueue.error(job.getId(), result.getError().get(), result.isRetry()); - if (!retry) { - LOGGER.error( - "Error while processing job {}: {}", job.getId(), result.getError().get()); - } - } + executor.execute(() -> processJobExecution(processor, job)); + } - if (jobSet.isPresent()) { - if (jobSet.get().isDone()) { - activeJobSets.remove(jobSet.get().getId()); - } - } - - if (logJobsTrace()) { - if (result.isOnHold()) { - LOGGER.trace(MARKER.JOBS, "Postponed job: {}", job); - } else { - long duration = Instant.now().toEpochMilli() - start.toEpochMilli(); - LOGGER.trace(MARKER.JOBS, "Processed job in {}: {}", pretty(duration), job); - } - } - activeJobs.decrementAndGet(); - checkWork(JOB_TYPE_WILDCARD); - }); + private void processJobExecution(JobProcessor processor, Job job) { + Instant start = Instant.now(); + Optional jobSet = job.getPartOf().map(jobQueue::getSet); + + setupExecutionContext(jobSet); + logJobProcessingStart(job); + handleJobSetStartup(job, jobSet); + + JobResult result = executeJob(processor, job, jobSet); + handleJobResult(job, result); + cleanupJobExecution(jobSet, result, start); + + activeJobs.decrementAndGet(); + checkWork(JOB_TYPE_WILDCARD); + } + + private void setupExecutionContext(Optional jobSet) { + if (jobSet.isPresent() && jobSet.get().getEntity().isPresent()) { + LogContext.put(LogContext.CONTEXT.SERVICE, jobSet.get().getEntity().get()); + } + } + + private void logJobProcessingStart(Job job) { + if (logJobsTrace()) { + LOGGER.trace(MARKER.JOBS, "Processing job: {}", job); + } + } + + private void handleJobSetStartup(Job job, Optional jobSet) { + if (jobSet.isPresent() + && !jobSet.get().isStarted() + && (jobSet.get().getSetup().isEmpty() + || !Objects.equals(job.getId(), jobSet.get().getSetup().get().getId()))) { + jobQueue.startJobSet(jobSet.get()); + + if (LOGGER.isInfoEnabled() || LOGGER.isInfoEnabled(MARKER.JOBS)) { + LOGGER.info( + MARKER.JOBS, + "{} started ({})", + jobSet.get().getLabel(), + jobQueue.getJobSetDetails(JobSetDetails.class, jobSet.get()).getLabel()); + } + } + } + + private JobResult executeJob(JobProcessor processor, Job job, Optional jobSet) { + try { + return processor.process(job, jobSet.orElse(null), jobQueue); + } catch (Throwable e) { + LogContext.errorAsDebug(LOGGER, e, "Error processing job {}", job.getId()); + return JobResult.error(e.getClass() + e.getMessage()); + } + } + + private void handleJobResult(Job job, JobResult result) { + if (result.isSuccess()) { + jobQueue.done(job.getId()); + } else if (result.isFailure()) { + boolean retry = jobQueue.error(job.getId(), result.getError().get(), result.isRetry()); + if (!retry) { + LOGGER.error("Error while processing job {}: {}", job.getId(), result.getError().get()); + } + } + } + + private void cleanupJobExecution(Optional jobSet, JobResult result, Instant start) { + if (jobSet.isPresent() && jobSet.get().isDone()) { + activeJobSets.remove(jobSet.get().getId()); + } + + logJobProcessingEnd(result, start); + } + + private void logJobProcessingEnd(JobResult result, Instant start) { + if (logJobsTrace()) { + if (result.isOnHold()) { + LOGGER.trace(MARKER.JOBS, "Postponed job: {}", result); + } else { + long duration = Instant.now().toEpochMilli() - start.toEpochMilli(); + LOGGER.trace(MARKER.JOBS, "Processed job in {}: {}", pretty(duration), result); + } + } } private static boolean logJobsTrace() { diff --git a/xtraplatform-jobs/src/main/java/de/ii/xtraplatform/jobs/app/OpsEndpointJobs.java b/xtraplatform-jobs/src/main/java/de/ii/xtraplatform/jobs/app/OpsEndpointJobs.java index 829d9fc0..be000855 100644 --- a/xtraplatform-jobs/src/main/java/de/ii/xtraplatform/jobs/app/OpsEndpointJobs.java +++ b/xtraplatform-jobs/src/main/java/de/ii/xtraplatform/jobs/app/OpsEndpointJobs.java @@ -10,7 +10,6 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.github.azahnen.dagger.annotations.AutoBind; -import de.ii.xtraplatform.base.domain.AppContext; import de.ii.xtraplatform.base.domain.Jackson; import de.ii.xtraplatform.base.domain.LogContext; import de.ii.xtraplatform.base.domain.LogContext.MARKER; @@ -50,18 +49,19 @@ @Singleton @AutoBind public class OpsEndpointJobs implements OpsEndpoint { - public class JobResponse { - public List sets; - public List open; - } private static final Logger LOGGER = LoggerFactory.getLogger(OpsEndpointJobs.class); private final JobQueue jobQueue; private final ObjectMapper objectMapper; + public class JobResponse { + public List sets; + public List open; + } + @Inject - public OpsEndpointJobs(AppContext appContext, Jackson jackson, JobQueue jobQueue) { + public OpsEndpointJobs(Jackson jackson, JobQueue jobQueue) { this.objectMapper = jackson.getDefaultObjectMapper(); this.jobQueue = jobQueue; } @@ -120,43 +120,59 @@ public Response getJobs(@QueryParam("debug") boolean debug) throws JsonProcessin @ApiResponse(responseCode = "204", description = "No content"), @ApiResponse(responseCode = "500", description = "Internal server error") }) - public synchronized Response takeJob(Map executor) - throws JsonProcessingException { - Optional job = jobQueue.take(executor.get("type"), executor.get("id")); - - if (job.isPresent()) { - if (LOGGER.isTraceEnabled() || LOGGER.isTraceEnabled(MARKER.JOBS)) { - LOGGER.trace( - MARKER.JOBS, - "Job {} taken by remote executor {}", - job.get().getId(), - executor.get("id")); + public Response takeJob(Map executor) throws JsonProcessingException { + synchronized (this) { + Optional job = jobQueue.take(executor.get("type"), executor.get("id")); + + if (job.isEmpty()) { + return Response.noContent().build(); } - Optional jobSet = job.get().getPartOf().map(jobQueue::getSet); + return processJobTaken(job.get(), executor.get("id")); + } + } - if (jobSet.isPresent() && jobSet.get().getEntity().isPresent()) { - LogContext.put(LogContext.CONTEXT.SERVICE, jobSet.get().getEntity().get()); - } + private Response processJobTaken(Job job, String executorId) throws JsonProcessingException { + logJobTaken(job, executorId); - if (jobSet.isPresent() && !jobSet.get().isStarted()) { - if (jobSet.get().getSetup().isEmpty() - || !Objects.equals(job.get().getId(), jobSet.get().getSetup().get().getId())) { - jobQueue.startJobSet(jobSet.get()); - - if (LOGGER.isInfoEnabled() || LOGGER.isInfoEnabled(MARKER.JOBS)) { - LOGGER.info( - MARKER.JOBS, - "{} started ({})", - jobSet.get().getLabel(), - jobQueue.getJobSetDetails(JobSetDetails.class, jobSet.get()).getLabel()); - } - } - } - return Response.ok(objectMapper.writeValueAsString(job.get())).build(); + Optional jobSet = job.getPartOf().map(jobQueue::getSet); + setupJobSetContext(jobSet); + handleJobSetStartup(job, jobSet); + + return Response.ok(objectMapper.writeValueAsString(job)).build(); + } + + private void logJobTaken(Job job, String executorId) { + if (LOGGER.isTraceEnabled() || LOGGER.isTraceEnabled(MARKER.JOBS)) { + LOGGER.trace(MARKER.JOBS, "Job {} taken by remote executor {}", job.getId(), executorId); + } + } + + private void setupJobSetContext(Optional jobSet) { + if (jobSet.isPresent() && jobSet.get().getEntity().isPresent()) { + LogContext.put(LogContext.CONTEXT.SERVICE, jobSet.get().getEntity().get()); + } + } + + private void handleJobSetStartup(Job job, Optional jobSet) { + if (jobSet.isPresent() + && !jobSet.get().isStarted() + && (jobSet.get().getSetup().isEmpty() + || !Objects.equals(job.getId(), jobSet.get().getSetup().get().getId()))) { + + jobQueue.startJobSet(jobSet.get()); + logJobSetStarted(jobSet.get()); } + } - return Response.noContent().build(); + private void logJobSetStarted(JobSet jobSet) { + if (LOGGER.isInfoEnabled() || LOGGER.isInfoEnabled(MARKER.JOBS)) { + LOGGER.info( + MARKER.JOBS, + "{} started ({})", + jobSet.getLabel(), + jobQueue.getJobSetDetails(JobSetDetails.class, jobSet).getLabel()); + } } @POST @@ -168,38 +184,39 @@ public synchronized Response takeJob(Map executor) @ApiResponse(responseCode = "204", description = "No content"), @ApiResponse(responseCode = "500", description = "Internal server error") }) - public synchronized Response updateJob( - @PathParam("jobId") String jobId, Map progress) { - try { - Optional job = - jobQueue.getTaken().stream() - .filter(job1 -> Objects.equals(job1.getId(), jobId)) - .findFirst(); + public Response updateJob(@PathParam("jobId") String jobId, Map progress) { + synchronized (this) { + try { + Optional job = + jobQueue.getTaken().stream() + .filter(job1 -> Objects.equals(job1.getId(), jobId)) + .findFirst(); - if (job.isPresent()) { - int delta = (Integer) progress.getOrDefault("delta", 0); + if (job.isPresent()) { + int delta = (Integer) progress.getOrDefault("delta", 0); - jobQueue.updateJob(job.get(), delta); + jobQueue.updateJob(job.get(), delta); - if (delta > 0 && job.get().getPartOf().isPresent()) { - JobSet set = jobQueue.getSet(job.get().getPartOf().get()); - jobQueue.updateJobSet(set, delta, progress); - } + if (delta > 0 && job.get().getPartOf().isPresent()) { + JobSet set = jobQueue.getSet(job.get().getPartOf().get()); + jobQueue.updateJobSet(set, delta, progress); + } - if (LOGGER.isTraceEnabled() || LOGGER.isTraceEnabled(MARKER.JOBS)) { - LOGGER.trace( - MARKER.JOBS, "Job {} progress updated by remote executor ({})", jobId, progress); - } - } else { - if (LOGGER.isWarnEnabled() || LOGGER.isWarnEnabled(MARKER.JOBS)) { - LOGGER.warn(MARKER.JOBS, "Received progress update for unknown job {}", jobId); + if (LOGGER.isTraceEnabled() || LOGGER.isTraceEnabled(MARKER.JOBS)) { + LOGGER.trace( + MARKER.JOBS, "Job {} progress updated by remote executor ({})", jobId, progress); + } + } else { + if (LOGGER.isWarnEnabled() || LOGGER.isWarnEnabled(MARKER.JOBS)) { + LOGGER.warn(MARKER.JOBS, "Received progress update for unknown job {}", jobId); + } } + } catch (Throwable e) { + LOGGER.error("Error while updating job {}", jobId, e); } - } catch (Throwable e) { - LOGGER.error("Error while updating job {}", jobId, e); - } - return Response.noContent().build(); + return Response.noContent().build(); + } } @DELETE @@ -212,28 +229,30 @@ public synchronized Response updateJob( @ApiResponse(responseCode = "404", description = "Job not found"), @ApiResponse(responseCode = "500", description = "Internal server error") }) - public synchronized Response closeJob( + public Response closeJob( @PathParam("jobId") String jobId, @Parameter(hidden = true) Map result) { - if (result.containsKey("error") && Objects.nonNull(result.get("error"))) { - boolean retry = - jobQueue.error(jobId, result.get("error"), Boolean.parseBoolean(result.get("retry"))); + synchronized (this) { + if (result.containsKey("error") && Objects.nonNull(result.get("error"))) { + boolean retry = + jobQueue.error(jobId, result.get("error"), Boolean.parseBoolean(result.get("retry"))); + + if (LOGGER.isTraceEnabled() || LOGGER.isTraceEnabled(MARKER.JOBS)) { + LOGGER.trace( + MARKER.JOBS, "Job {} marked as error by remote executor (retry: {})", jobId, retry); + } - if (LOGGER.isTraceEnabled() || LOGGER.isTraceEnabled(MARKER.JOBS)) { - LOGGER.trace( - MARKER.JOBS, "Job {} marked as error by remote executor (retry: {})", jobId, retry); + return Response.noContent().build(); } - return Response.noContent().build(); - } + if (jobQueue.done(jobId)) { + if (LOGGER.isTraceEnabled() || LOGGER.isTraceEnabled(MARKER.JOBS)) { + LOGGER.trace(MARKER.JOBS, "Job {} marked as done by remote executor", jobId); + } - if (jobQueue.done(jobId)) { - if (LOGGER.isTraceEnabled() || LOGGER.isTraceEnabled(MARKER.JOBS)) { - LOGGER.trace(MARKER.JOBS, "Job {} marked as done by remote executor", jobId); + return Response.noContent().build(); } - return Response.noContent().build(); + return Response.status(Status.NOT_FOUND).build(); } - - return Response.status(Status.NOT_FOUND).build(); } } diff --git a/xtraplatform-jobs/src/main/java/de/ii/xtraplatform/jobs/domain/AbstractJobQueueBackend.java b/xtraplatform-jobs/src/main/java/de/ii/xtraplatform/jobs/domain/AbstractJobQueueBackend.java index f0cdf97d..58d72923 100644 --- a/xtraplatform-jobs/src/main/java/de/ii/xtraplatform/jobs/domain/AbstractJobQueueBackend.java +++ b/xtraplatform-jobs/src/main/java/de/ii/xtraplatform/jobs/domain/AbstractJobQueueBackend.java @@ -18,16 +18,17 @@ import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +@SuppressWarnings("PMD.TooManyMethods") public abstract class AbstractJobQueueBackend extends AbstractVolatileComposed implements JobQueueBackend { - private static final Logger LOGGER = LoggerFactory.getLogger(AbstractJobQueueBackend.class); private final Map> priorities; private final Map> queues; + // Pre-create comparator and avoid object creation in checkQueue + private static final Comparator REVERSE_ORDER = Comparator.reverseOrder(); + protected AbstractJobQueueBackend(VolatileRegistry volatileRegistry) { super(volatileRegistry); @@ -82,7 +83,7 @@ public void push(BaseJob job, boolean untake) { updateJobSet((JobSet) job); if (((JobSet) job).getSetup().isPresent()) { - push(((JobSet) job).getSetup().get()); + push(((JobSet) job).getSetup().get(), false); } } else { throw new IllegalArgumentException("Unknown job type: " + job.getClass()); @@ -95,11 +96,9 @@ public Optional take(String type, String executor) { T queue = getQueue(type, priority); Optional job = takeJob(queue); - if (job.isEmpty()) { - continue; + if (job.isPresent()) { + return Optional.of(startJob(job.get(), executor)); } - - return Optional.of(startJob(job.get(), executor)); } return Optional.empty(); @@ -113,7 +112,7 @@ public boolean done(String jobId) { Job doneJob = doneJob(job.get()); onJobFinished(doneJob); - doneJob.getFollowUps().forEach(this::push); + doneJob.getFollowUps().forEach(followUp -> push(followUp, false)); return true; } @@ -149,22 +148,25 @@ public JobSet getSet(String setId) { @Override public Map>> getOpen() { - Map>> openJobs = new LinkedHashMap<>(); Set priorityTypes = getTypes(); - for (String type : priorityTypes) { - Set priorities = getPriorities(type); - Map> priorityMap = new LinkedHashMap<>(); - - for (Integer priority : priorities) { - T queue = getQueue(type, priority); - - priorityMap.put(priority, getJobsInQueue(queue)); - } - - openJobs.put(type, priorityMap); - } - return openJobs; + return priorityTypes.stream() + .collect( + LinkedHashMap::new, + (map, type) -> { + Set priorities = getPriorities(type); + Map> priorityMap = + priorities.stream() + .collect( + LinkedHashMap::new, + (pMap, priority) -> { + T queue = getQueue(type, priority); + pMap.put(priority, getJobsInQueue(queue)); + }, + Map::putAll); + map.put(type, priorityMap); + }, + Map::putAll); } protected abstract List getTakenIds(); @@ -173,7 +175,8 @@ public Map>> getOpen() { @Override public Collection getTaken() { - return getTakenIds().stream() + List takenIds = getTakenIds(); + return takenIds.stream() .map(this::getJob) .filter(Optional::isPresent) .map(Optional::get) @@ -182,7 +185,8 @@ public Collection getTaken() { @Override public Collection getFailed() { - return getFailedIds().stream() + List failedIds = getFailedIds(); + return failedIds.stream() .map(this::getJob) .filter(Optional::isPresent) .map(Optional::get) @@ -204,17 +208,20 @@ protected Set getPriorities(String type) { } private void checkQueue(String type, int priority) { - if (!priorities.containsKey(type)) { - // synchronized (this) { - priorities.put(type, new TreeSet<>(Comparator.reverseOrder())); - queues.put(type, new ConcurrentHashMap<>()); - // } - } - if (!priorities.get(type).contains(priority)) { - // synchronized (this) { - priorities.get(type).add(priority); - queues.get(type).put(priority, createQueue(type, priority)); - // } + // Use computeIfAbsent to avoid checking and creating separately + Set typePriorities = + priorities.computeIfAbsent( + type, + k -> { + queues.put(k, new ConcurrentHashMap<>()); + return new TreeSet<>(REVERSE_ORDER); + }); + + // Use computeIfAbsent for queue creation too + Map typeQueues = queues.get(type); + if (!typePriorities.contains(priority)) { + typePriorities.add(priority); + typeQueues.computeIfAbsent(priority, p -> createQueue(type, p)); } } @@ -226,7 +233,7 @@ private void onJobFinished(Job job) { if (jobSet.isPresent()) { // TODO: if done, mark for removal List jobSetFollowUps = onJobFinished(job, jobSet.get()); - jobSetFollowUps.forEach(this::push); + jobSetFollowUps.forEach(followUp -> push(followUp, false)); } } } diff --git a/xtraplatform-jobs/src/main/java/de/ii/xtraplatform/jobs/domain/JobQueueBackend.java b/xtraplatform-jobs/src/main/java/de/ii/xtraplatform/jobs/domain/JobQueueBackend.java index 6772e137..ac63d5c0 100644 --- a/xtraplatform-jobs/src/main/java/de/ii/xtraplatform/jobs/domain/JobQueueBackend.java +++ b/xtraplatform-jobs/src/main/java/de/ii/xtraplatform/jobs/domain/JobQueueBackend.java @@ -15,5 +15,6 @@ public interface JobQueueBackend extends JobQueue { boolean isEnabled(); + @Override void setJobTypes(Function>> jobTypesMapper); } diff --git a/xtraplatform-jobs/src/main/java/de/ii/xtraplatform/jobs/domain/JobSet.java b/xtraplatform-jobs/src/main/java/de/ii/xtraplatform/jobs/domain/JobSet.java index 9253ee3a..e68c71e2 100644 --- a/xtraplatform-jobs/src/main/java/de/ii/xtraplatform/jobs/domain/JobSet.java +++ b/xtraplatform-jobs/src/main/java/de/ii/xtraplatform/jobs/domain/JobSet.java @@ -20,8 +20,68 @@ @Value.Immutable @JsonDeserialize(builder = ImmutableJobSet.Builder.class) +@SuppressWarnings("PMD.UseObjectForClearerAPI") public interface JobSet extends BaseJob { + @SuppressWarnings("PMD.DataClass") + final class JobSetConfiguration { + private final String type; + private final int priority; + private final String entity; + private final String label; + private final String description; + private final JobSetDetails details; + + private JobSetConfiguration( + String type, + int priority, + String entity, + String label, + String description, + JobSetDetails details) { + this.type = type; + this.priority = priority; + this.entity = entity; + this.label = label; + this.description = description; + this.details = details; + } + + public static JobSetConfiguration of( + String type, + int priority, + String entity, + String label, + String description, + JobSetDetails details) { + return new JobSetConfiguration(type, priority, entity, label, description, details); + } + + public String getType() { + return type; + } + + public int getPriority() { + return priority; + } + + public String getEntity() { + return entity; + } + + public String getLabel() { + return label; + } + + public String getDescription() { + return description; + } + + public JobSetDetails getDetails() { + return details; + } + } + interface JobSetDetails { void init(Map parameters); @@ -40,6 +100,20 @@ interface JobSetDetails { @Override List getFollowUps(); + static JobSet of(JobSetConfiguration config) { + return new ImmutableJobSet.Builder() + .type(config.getType()) + .priority(config.getPriority()) + .entity(config.getEntity()) + .label(config.getLabel()) + .description(config.getDescription()) + .details(config.getDetails()) + .startedAt(new AtomicLong()) + .total(new AtomicInteger(0)) + .current(new AtomicInteger(0)) + .build(); + } + static JobSet of( String type, int priority, @@ -47,17 +121,7 @@ static JobSet of( String label, String description, JobSetDetails details) { - return new ImmutableJobSet.Builder() - .type(type) - .priority(priority) - .entity(entity) - .label(label) - .description(description) - .details(details) - .startedAt(new AtomicLong()) - .total(new AtomicInteger(0)) - .current(new AtomicInteger(0)) - .build(); + return of(JobSetConfiguration.of(type, priority, entity, label, description, details)); } default JobSet with(Job setup, Job cleanup) {