diff --git a/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/JobQueueBackendRedis.java b/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/JobQueueBackendRedis.java index d66fd414..351b93b0 100644 --- a/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/JobQueueBackendRedis.java +++ b/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/JobQueueBackendRedis.java @@ -47,6 +47,7 @@ @Singleton @AutoBind(interfaces = JobQueueBackend.class) +@SuppressWarnings("PMD.TooManyMethods") public class JobQueueBackendRedis extends AbstractJobQueueBackend implements JobQueueBackend { @@ -54,6 +55,15 @@ public class JobQueueBackendRedis extends AbstractJobQueueBackend private static final List INITIAL_LEVELS = IntStream.range(0, 24).map(i -> -1).boxed().toList(); + // Redis key constants + private static final String REDIS_KEY_PRIORITIES = "xtraplatform:jobs:priorities:"; + private static final String REDIS_KEY_QUEUE = "xtraplatform:jobs:queue:"; + private static final String REDIS_KEY_JOB = "xtraplatform:jobs:job:"; + private static final String REDIS_KEY_SET = "xtraplatform:jobs:set:"; + private static final String REDIS_KEY_TAKEN = "xtraplatform:jobs:taken"; + private static final String REDIS_KEY_FAILED = "xtraplatform:jobs:failed"; + private static final String REDIS_KEY_NOTIFICATIONS = "xtraplatform:jobs:notifications"; + private final boolean enabled; private final Redis redis; private final ObjectMapper mapper; @@ -64,7 +74,8 @@ public class JobQueueBackendRedis extends AbstractJobQueueBackend AppContext appContext, Jackson jackson, VolatileRegistry volatileRegistry, Redis redis) { super(volatileRegistry); - // TODO: housekeeping might check taken list using RPOPLPUSH with same source and destination + // NOPMD - TODO: housekeeping might check taken list using RPOPLPUSH with same source and + // destination // this way it can check for timeouts, then use a transaction with LREM, LPUSH and HMSET to // retry @@ -96,21 +107,21 @@ public void setJobTypes(Function>> jobTypesM @Override protected String createQueue(String type, int priority) { - redis.cmd().zadd("xtraplatform:jobs:priorities:" + type, priority, String.valueOf(priority)); + redis.cmd().zadd(REDIS_KEY_PRIORITIES + type, priority, String.valueOf(priority)); - return "xtraplatform:jobs:queue:" + type + ":" + priority; + return REDIS_KEY_QUEUE + type + ":" + priority; } @Override protected Set getTypes() { - return redis.cmd().keys("xtraplatform:jobs:priorities:*").stream() - .map(key -> key.substring("xtraplatform:jobs:priorities:".length())) + return redis.cmd().keys(REDIS_KEY_PRIORITIES + "*").stream() + .map(key -> key.substring(REDIS_KEY_PRIORITIES.length())) .collect(LinkedHashSet::new, LinkedHashSet::add, LinkedHashSet::addAll); } @Override protected Set getPriorities(String type) { - List priorities = redis.cmd().zrevrange("xtraplatform:jobs:priorities:" + type, 0, -1); + List priorities = redis.cmd().zrevrange(REDIS_KEY_PRIORITIES + type, 0, -1); return new LinkedHashSet<>(priorities.stream().map(Integer::parseInt).toList()); } @@ -118,34 +129,27 @@ protected Set getPriorities(String type) { @Override protected void updateJob(Job job) { try { - redis.json().jsonSet("xtraplatform:jobs:job:" + job.getId(), mapper.writeValueAsString(job)); + redis.json().jsonSet(REDIS_KEY_JOB + job.getId(), mapper.writeValueAsString(job)); } catch (Throwable e) { - throw new RuntimeException(e); + throw new IllegalStateException("Failed to serialize job to JSON: " + job.getId(), e); } } @Override public void updateJob(Job job, int progressDelta) { - redis - .json() - .jsonNumIncrBy( - "xtraplatform:jobs:job:" + job.getId(), Path2.of("$.current"), progressDelta); + redis.json().jsonNumIncrBy(REDIS_KEY_JOB + job.getId(), Path2.of("$.current"), progressDelta); redis .json() .jsonSet( - "xtraplatform:jobs:job:" + job.getId(), - Path2.of("$.updatedAt"), - Instant.now().getEpochSecond()); + REDIS_KEY_JOB + job.getId(), Path2.of("$.updatedAt"), Instant.now().getEpochSecond()); } @Override protected void updateJobSet(JobSet jobSet) { try { - redis - .json() - .jsonSet("xtraplatform:jobs:set:" + jobSet.getId(), mapper.writeValueAsString(jobSet)); + redis.json().jsonSet(REDIS_KEY_SET + jobSet.getId(), mapper.writeValueAsString(jobSet)); } catch (Throwable e) { - throw new RuntimeException(e); + throw new IllegalStateException("Failed to serialize job set to JSON: " + jobSet.getId(), e); } } @@ -154,7 +158,7 @@ public void startJobSet(JobSet jobSet) { redis .json() .jsonSet( - "xtraplatform:jobs:set:" + jobSet.getId(), + REDIS_KEY_SET + jobSet.getId(), Path2.of("$.startedAt"), Instant.now().getEpochSecond()); } @@ -184,8 +188,7 @@ public void updateJobSet(JobSet jobSet, int progressDelta, Map d @Override protected Optional getJobSet(String setId) { - String jobSetJson = - redis.json().jsonGetAsPlainString("xtraplatform:jobs:set:" + setId, Path.ROOT_PATH); + String jobSetJson = redis.json().jsonGetAsPlainString(REDIS_KEY_SET + setId, Path.ROOT_PATH); if (Objects.isNull(jobSetJson)) { return Optional.empty(); @@ -196,7 +199,7 @@ protected Optional getJobSet(String setId) { return Optional.ofNullable(job); } catch (Throwable e) { - throw new RuntimeException(e); + throw new IllegalStateException("Failed to deserialize job set from JSON: " + setId, e); } } @@ -206,8 +209,8 @@ protected void queueJob(Job job, boolean untake) { updateJob(job); if (untake) { - // TODO: use a transaction here - redis.cmd().lrem("xtraplatform:jobs:taken", 1, job.getId()); + // NOPMD - TODO: use a transaction here + redis.cmd().lrem(REDIS_KEY_TAKEN, 1, job.getId()); redis.cmd().rpush(queue, job.getId()); } else { redis.cmd().lpush(queue, job.getId()); @@ -241,7 +244,7 @@ protected Job failJob(Job job, String error) { updateJob(failedJob); - redis.cmd().rpush("xtraplatform:jobs:failed", job.getId()); + redis.cmd().rpush(REDIS_KEY_FAILED, job.getId()); return failedJob; } @@ -250,7 +253,7 @@ protected Job failJob(Job job, String error) { protected Job doneJob(Job job) { Job doneJob = job.done(); - redis.json().jsonDel("xtraplatform:jobs:job:" + doneJob.getId()); + redis.json().jsonDel(REDIS_KEY_JOB + doneJob.getId()); return doneJob; } @@ -258,9 +261,7 @@ protected Job doneJob(Job job) { @Override protected Optional takeJob(String queue) { String jobId = - redis - .cmd() - .lmove(queue, "xtraplatform:jobs:taken", ListDirection.RIGHT, ListDirection.LEFT); + redis.cmd().lmove(queue, REDIS_KEY_TAKEN, ListDirection.RIGHT, ListDirection.LEFT); if (Objects.nonNull(jobId)) { return getJob(jobId); @@ -271,7 +272,7 @@ protected Optional takeJob(String queue) { @Override protected Optional untakeJob(String jobId) { - long count = redis.cmd().lrem("xtraplatform:jobs:taken", 1, jobId); + long count = redis.cmd().lrem(REDIS_KEY_TAKEN, 1, jobId); if (count > 0) { return getJob(jobId); @@ -284,7 +285,7 @@ protected Optional untakeJob(String jobId) { protected List onJobFinished(Job job, JobSet jobSet) { List followUps = jobSet.done(job); - redis.json().jsonDel("xtraplatform:jobs:job:" + job.getId()); + redis.json().jsonDel(REDIS_KEY_JOB + job.getId()); return followUps; } @@ -305,33 +306,33 @@ protected List getJobsInQueue(String queue) { @Override protected void notifyObservers(String type) { - redis.pubsub().publish("xtraplatform:jobs:notifications", type); + redis.pubsub().publish(REDIS_KEY_NOTIFICATIONS, type); } @Override public void onPush(Consumer callback) { - redis.pubsub().subscribe("xtraplatform:jobs:notifications", callback); + redis.pubsub().subscribe(REDIS_KEY_NOTIFICATIONS, callback); } @Override public boolean doneSet(String jobSetId) { - long count = redis.json().jsonDel("xtraplatform:jobs:set:" + jobSetId); + long count = redis.json().jsonDel(REDIS_KEY_SET + jobSetId); return count > 0; } @Override public boolean error(String jobId, String error, boolean retry) { - // TODO: retry logic + // NOPMD - TODO: retry logic return false; } @Override public Collection getSets() { - Set jobSetIds = redis.cmd().keys("xtraplatform:jobs:set:*"); + Set jobSetIds = redis.cmd().keys(REDIS_KEY_SET + "*"); return jobSetIds.stream() - .map(id -> id.substring("xtraplatform:jobs:set:".length())) + .map(id -> id.substring(REDIS_KEY_SET.length())) .map(this::getJobSet) .filter(Optional::isPresent) .map(Optional::get) @@ -340,18 +341,17 @@ public Collection getSets() { @Override protected List getTakenIds() { - return redis.cmd().lrange("xtraplatform:jobs:taken", 0, -1); + return redis.cmd().lrange(REDIS_KEY_TAKEN, 0, -1); } @Override protected List getFailedIds() { - return redis.cmd().lrange("xtraplatform:jobs:failed", 0, -1); + return redis.cmd().lrange(REDIS_KEY_FAILED, 0, -1); } @Override protected Optional getJob(String jobId) { - String jobJson = - redis.json().jsonGetAsPlainString("xtraplatform:jobs:job:" + jobId, Path.ROOT_PATH); + String jobJson = redis.json().jsonGetAsPlainString(REDIS_KEY_JOB + jobId, Path.ROOT_PATH); if (Objects.isNull(jobJson)) { return Optional.empty(); @@ -362,7 +362,7 @@ protected Optional getJob(String jobId) { return Optional.ofNullable(job); } catch (Throwable e) { - throw new RuntimeException(e); + throw new IllegalStateException("Failed to deserialize job from JSON: " + jobId, e); } } @@ -382,7 +382,8 @@ private Object unpackDetails(Job job) { } } catch (IOException e) { - throw new RuntimeException(e); + throw new IllegalStateException( + "Failed to convert job details to target type: " + job.getType(), e); } } @@ -405,7 +406,8 @@ private Object unpackSetDetails(JobSet jobSet) { } } catch (IOException e) { - throw new RuntimeException(e); + throw new IllegalStateException( + "Failed to convert job set details to target type: " + jobSet.getType(), e); } } @@ -428,14 +430,10 @@ private void applyJsonPaths(String jobSetId, Map jsonPathUpdates redis .json() .jsonNumIncrBy( - "xtraplatform:jobs:set:" + jobSetId, - Path2.of(entry.getKey()), - (Integer) entry.getValue()); + REDIS_KEY_SET + jobSetId, Path2.of(entry.getKey()), (Integer) entry.getValue()); continue; } - redis - .json() - .jsonSet("xtraplatform:jobs:set:" + jobSetId, Path2.of(entry.getKey()), entry.getValue()); + redis.json().jsonSet(REDIS_KEY_SET + jobSetId, Path2.of(entry.getKey()), entry.getValue()); } } } diff --git a/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/RedisImpl.java b/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/RedisImpl.java index 4193e147..ad0b2e34 100644 --- a/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/RedisImpl.java +++ b/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/RedisImpl.java @@ -74,6 +74,7 @@ public void onStop() { AppLifeCycle.super.onStop(); } + @SuppressWarnings("PMD.AvoidSynchronizedAtMethodLevel") @Override protected synchronized void onVolatileStart() { super.onVolatileStart(); @@ -145,7 +146,7 @@ public Tuple check() { connect(); if (Objects.isNull(jedis)) { - // TODO: retry + // NOPMD - TODO: retry if (Objects.nonNull(connectionError)) { return Tuple.of(State.UNAVAILABLE, connectionError.getMessage()); } diff --git a/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/app/ReactiveRx.java b/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/app/ReactiveRx.java index 0ff50a1f..c3211e16 100644 --- a/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/app/ReactiveRx.java +++ b/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/app/ReactiveRx.java @@ -30,6 +30,7 @@ @Singleton @AutoBind +@SuppressWarnings({"PMD.GodClass", "PMD.TooManyMethods"}) public class ReactiveRx implements Reactive { @Inject diff --git a/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/app/RunnerRx.java b/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/app/RunnerRx.java index f2c3c888..3a5a064a 100644 --- a/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/app/RunnerRx.java +++ b/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/app/RunnerRx.java @@ -24,8 +24,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; // TODO: the queue was introduced as a mean to protect the connection pool and prevent deadlocks // because of a bug (running.get() < queueSize instead of running.get() < capacity) it was never @@ -35,8 +33,6 @@ // FeatureStreams public class RunnerRx implements Runner { - private static final Logger LOGGER = LoggerFactory.getLogger(RunnerRx.class); - private final Scheduler scheduler; private final String name; private final int capacity; @@ -45,11 +41,16 @@ public class RunnerRx implements Runner { private final AtomicInteger running; public RunnerRx(String name) { - this(name, Runner.DYNAMIC_CAPACITY, Runner.DYNAMIC_CAPACITY); + this( + getConfig(Runner.DYNAMIC_CAPACITY), name, Runner.DYNAMIC_CAPACITY, Runner.DYNAMIC_CAPACITY); } public RunnerRx(String name, int capacity, int queueSize) { - this(getConfig(name, capacity), name, capacity, queueSize); + this(getConfig(capacity), name, capacity, queueSize); + } + + public RunnerRx(int capacity, int queueSize) { + this(getConfig(capacity), "default", capacity, queueSize); } RunnerRx(ExecutorService executorService, String name, int capacity, int queueSize) { @@ -58,7 +59,6 @@ public RunnerRx(String name, int capacity, int queueSize) { } // TODO: thread names - getDispatcherName(name); this.scheduler = Schedulers.from(executorService); scheduler.start(); @@ -153,26 +153,19 @@ public int getActiveStreams() { return running.get(); } - private static ExecutorService getConfig(String name, int capacity) { - return capacity == Runner.DYNAMIC_CAPACITY - ? getDefaultConfig(name) - : getConfig(name, capacity, capacity); + private static ExecutorService getConfig(int capacity) { + return capacity == Runner.DYNAMIC_CAPACITY ? getDefaultConfig() : getConfig(capacity, capacity); } - private static ExecutorService getDefaultConfig(String name) { - return getConfig(name, 8, 64); + private static ExecutorService getDefaultConfig() { + return getConfig(8, 64); } - // TODO - private static ExecutorService getConfig(String name, int parallelismMin, int parallelismMax) { + private static ExecutorService getConfig(int parallelismMin, int parallelismMax) { return Executors.newWorkStealingPool(Math.max(1, parallelismMax)); } - private static String getDispatcherName(String name) { - return String.format("stream.%s", name); - } - @Override public void close() { if (Objects.nonNull(scheduler)) {