Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 13 additions & 9 deletions plugin/src/main/java/org/opennms/timeseries/cortex/CortexTSS.java
Original file line number Diff line number Diff line change
Expand Up @@ -233,15 +233,19 @@ public void store(final List<Sample> samples, String clientID) throws StorageExc
final Request request = builder.build();

LOG.trace("Writing: {}", writeRequest);
asyncHttpCallsBulkhead.executeCompletionStage(() -> executeAsync(request)).whenComplete((r, ex) -> {
if (ex == null) {
samplesWritten.mark(samplesSorted.size());
} else {
// FIXME: Data loss
samplesLost.mark(samples.size());
LOG.error("Error occurred while storing samples, sample will be lost.", ex);
}
});
// Block until the HTTP write completes. This ensures the ring buffer worker
// thread does not process the next batch until this write has landed, preserving
// per-series timestamp ordering across consecutive WriteRequests as required by
// the Prometheus Remote Write spec.
try {
asyncHttpCallsBulkhead.executeCompletionStage(() -> executeAsync(request)).toCompletableFuture().get(
config.getWriteTimeoutInMs(), TimeUnit.MILLISECONDS);
samplesWritten.mark(samplesSorted.size());
} catch (Exception ex) {
samplesLost.mark(samplesSorted.size());
Throwable cause = ex.getCause() != null ? ex.getCause() : ex;
throw new StorageException("Failed to write samples to Prometheus: " + cause.getMessage(), cause);
}
}

private void persistExternalTags(final Sample s) {
Expand Down