Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public XacmlRequest(
actionAttributes.forEach((id, value) -> add(id, value, action, geoXacmlVersion));

Request =
version == XacmlJsonVersion._1_0
version == XacmlJsonVersion.V1_0
? request10(subject.build(), resource.build(), action.build())
: request11(subject.build(), resource.build(), action.build());
}
Expand All @@ -80,7 +80,7 @@ private static void add(
new Attribute(
id,
value,
geoXacmlVersion == GeoXacmlVersion._1_0
geoXacmlVersion == GeoXacmlVersion.V1_0
? "urn:ogc:def:dataType:geoxacml:1.0:geometry"
: "urn:ogc:def:geoxacml:3.0:data-type:geometry"));
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@
import de.ii.xtraplatform.base.domain.StoreSource;
import de.ii.xtraplatform.base.domain.StoreSource.Content;
import de.ii.xtraplatform.base.domain.StoreSource.Mode;
import de.ii.xtraplatform.base.domain.StoreSource.Type;
import de.ii.xtraplatform.base.domain.StoreSourceFs;
import java.nio.file.Path;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
Expand All @@ -35,17 +32,15 @@ public class StoreImpl implements Store {

private static final Logger LOGGER = LoggerFactory.getLogger(StoreImpl.class);

private final Path dataDirectory;
private final StoreConfiguration storeConfiguration;
private final List<StoreSource> sources;

@Inject
StoreImpl(AppContext appContext) {
this(appContext.getDataDir(), appContext.getConfiguration().getStore());
this(appContext.getConfiguration().getStore());
}

public StoreImpl(Path dataDirectory, StoreConfiguration storeConfiguration) {
this.dataDirectory = dataDirectory;
public StoreImpl(StoreConfiguration storeConfiguration) {
this.storeConfiguration = storeConfiguration;
this.sources =
storeConfiguration.getSources().stream()
Expand All @@ -54,30 +49,30 @@ public StoreImpl(Path dataDirectory, StoreConfiguration storeConfiguration) {
info();
}

public void info() {
LOGGER.info(
"Loading store ({}{}{})",
storeConfiguration.isReadOnly() ? "read-only" : "writable",
storeConfiguration.isWatch() ? ", watching for changes" : "",
storeConfiguration.isFiltered()
? String.format(", filtered by %s", storeConfiguration.getFilter().get().getAsLabel())
: "");

sources.forEach(
s -> {
String src =
Objects.equals(s.getType(), Type.FS_KEY)
? ((StoreSourceFs) s).getAbsolutePath(dataDirectory).toString()
: s.getSrc();
String mode = storeConfiguration.isReadOnly() ? "" : String.format(" [%s]", s.getMode());
String subType =
(s.getContent() == Content.RESOURCES || s.getContent() == Content.VALUES)
&& s.getPrefix().isPresent()
? String.format(" [%s]", s.getPrefix().get())
: "";

LOGGER.info(" {} [{}]{}{}", s.getLabelSpaces(), s.getContent(), subType, mode);
});
@SuppressWarnings("PMD.CognitiveComplexity")
private void info() {
if (LOGGER.isInfoEnabled()) {
LOGGER.info(
"Loading store ({}{}{})",
storeConfiguration.isReadOnly() ? "read-only" : "writable",
storeConfiguration.isWatch() ? ", watching for changes" : "",
storeConfiguration.isFiltered()
? String.format(", filtered by %s", storeConfiguration.getFilter().get().getAsLabel())
: "");

sources.forEach(
s -> {
String mode =
storeConfiguration.isReadOnly() ? "" : String.format(" [%s]", s.getMode());
String subType =
(s.getContent() == Content.RESOURCES || s.getContent() == Content.VALUES)
&& s.getPrefix().isPresent()
? String.format(" [%s]", s.getPrefix().get())
: "";

LOGGER.info(" {} [{}]{}{}", s.getLabelSpaces(), s.getContent(), subType, mode);
});
}
}

@Override
Expand All @@ -99,7 +94,7 @@ public List<StoreSource> get(Content content) {
source ->
source.getContent() == content
|| source.getContent() == Content.ALL
|| (content.isEvent() && source.getContent() == Content.ENTITIES))
|| content.isEvent() && source.getContent() == Content.ENTITIES)
.collect(Collectors.toUnmodifiableList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ public VariableSubstitutor(boolean strict, boolean substitutionInVariables) {
this.setEnableSubstitutionInVariables(substitutionInVariables);
}

@Override
@SuppressWarnings("PMD.PreserveStackTrace")
protected boolean substitute(TextStringBuilder buf, int offset, int length) {
try {
return super.substitute(buf, offset, length);
Expand All @@ -92,9 +94,11 @@ private static Map<String, String> extract(Map<String, Object> constants, String
result.putAll(
extract((Map<String, Object>) entry.getValue(), prefix + entry.getKey() + "."));
} else if (entry.getValue() instanceof List) {
LOGGER.warn(
"Ignoring list value for substitution '{}'. Only scalar values and maps are supported.",
prefix + entry.getKey());
if (LOGGER.isWarnEnabled()) {
LOGGER.warn(
"Ignoring list value for substitution '{}'. Only scalar values and maps are supported.",
prefix + entry.getKey());
}
} else {
result.put(prefix + entry.getKey(), entry.getValue().toString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

@Singleton
@AutoBind
@SuppressWarnings("PMD.TooManyMethods")
public class VolatileRegistryImpl implements VolatileRegistry {

private static final Logger LOGGER = LoggerFactory.getLogger(VolatileRegistryImpl.class);
Expand Down Expand Up @@ -77,7 +78,6 @@ public class VolatileRegistryImpl implements VolatileRegistry {
this.cancelTasks = new ArrayDeque<>();
this.onRegister = new ArrayList<>();
this.onUnRegister = new ArrayList<>();
this.currentSchedule = null;
this.currentRate = 0;
}

Expand Down Expand Up @@ -108,13 +108,14 @@ public void register(Volatile2 dependency) {
polls.put(dependency.getUniqueKey(), polling);
intervals.put(dependency.getUniqueKey(), 0);

schedulePoll((polling).getIntervalMs());
schedulePoll(polling.getIntervalMs());
}
}
}
}

@Override
@SuppressWarnings("PMD.AvoidSynchronizedAtMethodLevel")
public synchronized void unregister(Volatile2 dependency) {
synchronized (this) {
volatiles.remove(dependency.getUniqueKey());
Expand All @@ -131,32 +132,42 @@ public synchronized void unregister(Volatile2 dependency) {
@Override
public void change(Volatile2 dependency, State from, State to) {
String key = dependency.getUniqueKey();
logStateChange(from, to, key);
notifyWatchers(key, from, to);
}

private void logStateChange(State from, State to, String key) {
if (LOGGER.isDebugEnabled(MARKER.DI)) {
LOGGER.debug(MARKER.DI, "Volatile state changed from {} to {}: {}", from, to, key);
}
}

private void notifyWatchers(String key, State from, State to) {
synchronized (this) {
if (watchers.containsKey(key)) {
for (ChangeHandler handler : watchers.get(key)) {
if (Objects.nonNull(handler)) {
changeExecutor.submit(
() -> {
try {
handler.change(from, to);
} catch (Throwable e) {
// ignore
if (LOGGER.isDebugEnabled()) {
LogContext.errorAsDebug(LOGGER, e, "Error in volatile watcher");
}
}
});
}
executeHandler(handler, from, to);
}
}
}
}

private void executeHandler(ChangeHandler handler, State from, State to) {
if (Objects.nonNull(handler)) {
changeExecutor.submit(
() -> {
try {
handler.change(from, to);
} catch (Throwable e) {
// ignore
if (LOGGER.isDebugEnabled()) {
LogContext.errorAsDebug(LOGGER, e, "Error in volatile watcher");
}
}
});
}
}

@Override
public Runnable watch(Volatile2 dependency, ChangeHandler handler) {
String key = dependency.getUniqueKey();
Expand Down Expand Up @@ -224,48 +235,71 @@ public void listen(BiConsumer<String, Volatile2> onRegister, Consumer<String> on
}

private void schedulePoll(int delayMs) {
if (delayMs > 0 && currentRate > delayMs || currentRate <= 0) {
if (Objects.nonNull(currentSchedule)) {
// LOGGER.debug("Cancelling current polling schedule");
currentSchedule.cancel(false);
}
if (shouldReschedulePoll(delayMs)) {
cancelCurrentScheduleIfExists();
startNewPollingSchedule(delayMs);
}
}

this.currentRate = delayMs;
if (LOGGER.isDebugEnabled(MARKER.DI)) {
LOGGER.debug(MARKER.DI, "Scheduling polling every {}ms", delayMs);
private boolean shouldReschedulePoll(int delayMs) {
return delayMs > 0 && currentRate > delayMs || currentRate <= 0;
}

private void cancelCurrentScheduleIfExists() {
if (Objects.nonNull(currentSchedule)) {
// LOGGER.debug("Cancelling current polling schedule");
currentSchedule.cancel(false);
}
}

private void startNewPollingSchedule(int delayMs) {
this.currentRate = delayMs;
if (LOGGER.isDebugEnabled(MARKER.DI)) {
LOGGER.debug(MARKER.DI, "Scheduling polling every {}ms", delayMs);
}
this.currentSchedule =
pollingExecutor.scheduleWithFixedDelay(
() -> executePollingCycle(delayMs), delayMs, delayMs, TimeUnit.MILLISECONDS);
}

private void executePollingCycle(int delayMs) {
processCancelTasks();
processPollingIntervals(delayMs);
}

private void processCancelTasks() {
while (!cancelTasks.isEmpty()) {
cancelTasks.remove().run();
}
}

private void processPollingIntervals(int delayMs) {
for (Map.Entry<String, Integer> entry : intervals.entrySet()) {
String key = entry.getKey();
int interval = entry.getValue() - delayMs;

if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Checking {} in {}ms", key, interval);
}
this.currentSchedule =
pollingExecutor.scheduleWithFixedDelay(
() -> {
while (!cancelTasks.isEmpty()) {
cancelTasks.remove().run();
}

for (Map.Entry<String, Integer> entry : intervals.entrySet()) {
String key = entry.getKey();
int interval = entry.getValue() - delayMs;

if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Checking {} in {}ms", key, interval);
}

if (interval <= 0) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Checking {} now", key);
}
Polling polling = polls.get(key);
interval = polling.getIntervalMs();
if (interval <= 0) {
interval = executePolling(key);
}

Future<?> future = pollingExecutor.submit(polling::poll);
cancelTasks.add(() -> future.cancel(true));
}
intervals.put(key, interval);
}
}

intervals.put(key, interval);
}
},
delayMs,
delayMs,
TimeUnit.MILLISECONDS);
private int executePolling(String key) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Checking {} now", key);
}
Polling polling = polls.get(key);
int interval = polling.getIntervalMs();

Future<?> future = pollingExecutor.submit(polling::poll);
cancelTasks.add(() -> future.cancel(true));

return interval;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ public void setServerFactory(ServerFactory factory) {

@JsonIgnore
@Override
@SuppressWarnings("PMD.AvoidSynchronizedAtMethodLevel")
public synchronized void setLoggingFactory(LoggingFactory factory) {
throw new NotImplementedException();
}
Expand Down
Loading