diff --git a/benchmarks/build.gradle b/benchmarks/build.gradle index de5f7a7ab581f..96727eb3a2670 100644 --- a/benchmarks/build.gradle +++ b/benchmarks/build.gradle @@ -49,6 +49,7 @@ dependencies { api(project(':x-pack:plugin:esql:compute')) implementation project(path: ':libs:native') implementation project(path: ':libs:simdvec') + implementation project(path: ':libs:exponential-histogram') expression(project(path: ':modules:lang-expression', configuration: 'zip')) painless(project(path: ':modules:lang-painless', configuration: 'zip')) nativeLib(project(':libs:native')) diff --git a/benchmarks/src/main/java/org/elasticsearch/benchmark/exponentialhistogram/ExponentialHistogramGenerationBench.java b/benchmarks/src/main/java/org/elasticsearch/benchmark/exponentialhistogram/ExponentialHistogramGenerationBench.java new file mode 100644 index 0000000000000..954b627a40a67 --- /dev/null +++ b/benchmarks/src/main/java/org/elasticsearch/benchmark/exponentialhistogram/ExponentialHistogramGenerationBench.java @@ -0,0 +1,99 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.benchmark.exponentialhistogram; + +import org.elasticsearch.exponentialhistogram.ExponentialHistogramGenerator; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.profile.GCProfiler; +import org.openjdk.jmh.profile.StackProfiler; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +@Warmup(iterations = 3, time = 3, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 5, time = 2, timeUnit = TimeUnit.SECONDS) +@Fork(1) +@Threads(1) +@State(Scope.Thread) +public class ExponentialHistogramGenerationBench { + + @Param({ "100", "500", "1000", "5000", "10000", "20000" }) + int bucketCount; + + @Param({ "NORMAL", "GAUSSIAN" }) + String distribution; + + Random random; + ExponentialHistogramGenerator histoGenerator; + + double[] data = new double[1000000]; + + @Setup + public void setUp() { + random = ThreadLocalRandom.current(); + histoGenerator = new ExponentialHistogramGenerator(bucketCount); + + Supplier nextRandom = () -> distribution.equals("GAUSSIAN") ? random.nextGaussian() : random.nextDouble(); + + // Make sure that we start with a non-empty histogram, as this distorts initial additions + for (int i = 0; i < 10000; ++i) { + histoGenerator.add(nextRandom.get()); + } + + for (int i = 0; i < data.length; ++i) { + data[i] = nextRandom.get(); + } + } + + @State(Scope.Thread) + public static class ThreadState { + int index = 0; + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + public void add(ThreadState state) { + if (state.index >= data.length) { + state.index = 0; + } + histoGenerator.add(data[state.index++]); + } + + public static void main(String[] args) throws RunnerException { + Options opt = new OptionsBuilder().include(".*" + ExponentialHistogramGenerationBench.class.getSimpleName() + ".*") + .warmupIterations(5) + .measurementIterations(5) + .addProfiler(GCProfiler.class) + .addProfiler(StackProfiler.class) + .build(); + + new Runner(opt).run(); + } +} diff --git a/benchmarks/src/main/java/org/elasticsearch/benchmark/exponentialhistogram/ExponentialHistogramMergeBench.java b/benchmarks/src/main/java/org/elasticsearch/benchmark/exponentialhistogram/ExponentialHistogramMergeBench.java new file mode 100644 index 0000000000000..22e38b24cc886 --- /dev/null +++ b/benchmarks/src/main/java/org/elasticsearch/benchmark/exponentialhistogram/ExponentialHistogramMergeBench.java @@ -0,0 +1,113 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.benchmark.exponentialhistogram; + +import org.elasticsearch.exponentialhistogram.BucketIterator; +import org.elasticsearch.exponentialhistogram.ExponentialHistogram; +import org.elasticsearch.exponentialhistogram.ExponentialHistogramGenerator; +import org.elasticsearch.exponentialhistogram.ExponentialHistogramMerger; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.List; +import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +@Warmup(iterations = 3, time = 3, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 5, time = 2, timeUnit = TimeUnit.SECONDS) +@Fork(1) +@Threads(1) +@State(Scope.Thread) +public class ExponentialHistogramMergeBench { + + @Param({ "1000", "5000" }) + int bucketCount; + + @Param({ "0.01", "0.1", "0.25", "0.5", "1.0", "2.0" }) + double mergedHistoSizeFactor; + + Random random; + ExponentialHistogramMerger histoMerger; + + ExponentialHistogram[] toMerge = new ExponentialHistogram[10_000]; + + @Setup + public void setUp() { + random = ThreadLocalRandom.current(); + histoMerger = new ExponentialHistogramMerger(bucketCount); + + ExponentialHistogramGenerator initial = new ExponentialHistogramGenerator(bucketCount); + for (int j = 0; j < bucketCount; j++) { + initial.add(Math.pow(1.001, j)); + } + ExponentialHistogram initialHisto = initial.get(); + int cnt = getBucketCount(initialHisto); + if (cnt < bucketCount) { + throw new IllegalArgumentException("Expected bucket count to be " + bucketCount + ", but was " + cnt); + } + histoMerger.add(initialHisto); + + int dataPointSize = (int) Math.round(bucketCount * mergedHistoSizeFactor); + + for (int i = 0; i < toMerge.length; i++) { + ExponentialHistogramGenerator generator = new ExponentialHistogramGenerator(dataPointSize); + + int bucketIndex = 0; + for (int j = 0; j < dataPointSize; j++) { + bucketIndex += 1 + random.nextInt(bucketCount) % (Math.max(1, bucketCount / dataPointSize)); + generator.add(Math.pow(1.001, bucketIndex)); + } + toMerge[i] = generator.get(); + cnt = getBucketCount(toMerge[i]); + if (cnt < dataPointSize) { + throw new IllegalArgumentException("Expected bucket count to be " + dataPointSize + ", but was " + cnt); + } + } + } + + private static int getBucketCount(ExponentialHistogram histo) { + int cnt = 0; + for (BucketIterator it : List.of(histo.negativeBuckets().iterator(), histo.positiveBuckets().iterator())) { + while (it.hasNext()) { + cnt++; + it.advance(); + } + } + return cnt; + } + + @State(Scope.Thread) + public static class ThreadState { + int index = 0; + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + public void add(ThreadState state) { + if (state.index >= toMerge.length) { + state.index = 0; + } + histoMerger.add(toMerge[state.index++]); + } +} diff --git a/gradle/verification-metadata.xml b/gradle/verification-metadata.xml index bb4ae5da279fb..dd6a6276d7c38 100644 --- a/gradle/verification-metadata.xml +++ b/gradle/verification-metadata.xml @@ -66,6 +66,11 @@ + + + + + diff --git a/libs/exponential-histogram/README.md b/libs/exponential-histogram/README.md new file mode 100644 index 0000000000000..1331428304f22 --- /dev/null +++ b/libs/exponential-histogram/README.md @@ -0,0 +1,103 @@ +This library provides an implementation of merging and analysis algorithms for exponential histograms based on the [OpenTelemetry definition](https://opentelemetry.io/docs/specs/otel/metrics/data-model/#exponentialhistogram). It is designed as a complementary tool to the OpenTelemetry SDK, focusing specifically on efficient histogram merging and accurate percentile estimation. + +## Overview + +The library implements base-2 exponential histograms with perfect subsetting. The most imporant properties are: + +* The histogram has a scale parameter, which defines the accuracy. A higher scale implies a higher accuracy. +* The `base` for the buckets is defined as `base = 2^(2^-scale)`. +* The histogram bucket at index `i` has the range `(base^i, base^(i+1)]` +* Negative values are represented by a separate negative range of buckets with the boundaries `(-base^(i+1), -base^i]` +* Histograms are perfectly subsetting: increasing the scale by one merges each pair of neighboring buckets +* A special zero bucket with a zero-threshold is used to handle zero and close-to-zero values + +For more details please refer to the [OpenTelemetry definition](https://opentelemetry.io/docs/specs/otel/metrics/data-model/#exponentialhistogram). + +The library implements a sparse storage approach where only populated buckets consume memory and count towards the bucket limit. This differs from the OpenTelemetry implementation, which uses dense storage. While dense storage allows for O(1) time insertion of individual values, our sparse representation requires O(log m) time where m is the bucket capacity. However, the sparse representation enables more efficient storage and provides a simple merging algorithm with runtime linear in the number of populated buckets. Additionally, this library also provides an array-backed sparse storage, ensuring cache efficiency. + +The sparse storage approach offers significant advantages for distributions with fewer distinct values than the bucket count, allowing the library to achieve representation of such distributions with an error so small that it won't be noticed in practice. This makes it suitable not only for exponential histograms but also as a universal solution for handling explicit bucket histograms. + +## Merging Algorithm + +The merging algorithm works similarly to the merge-step of merge sort. We simultaneously walk through the buckets of both histograms in order, merging them on the fly as needed. If the total number of buckets in the end would exceed the bucket limit, we scale down as needed. + +Before we merge the buckets, we need to take care of the special zero-bucket and bring both histograms to the same scale. + +For the zero-bucket, we merge the zero threshold from both histograms and collapse any overlapping buckets into the resulting new zero bucket. + +In order to bring both histograms to the same scale, we can make adjustments in both directions: we can increase or decrease the scale of histograms as needed. + +See the [upscaling section](#upscaling) for details on how the upscaling works. Upscaling helps prevent the precision of the result histogram merged from many histograms from being dragged down to the lowest scale of a potentially misconfigured input histogram. For example, if a histogram is recorded with a too low zero threshold, this can result in a degraded scale when using dense histogram storage, even if the histogram only contains two points. + +### Upscaling + +In general, we assume that all values in a bucket lie on a single point: the point of least relative error. This is the point `x` in the bucket such that: + +``` +(x - l) / l = (u - x) / u +``` + +Where `l` is the lower bucket boundary and `u` is the upper bucket boundary. + +This assumption allows us to increase the scale of histograms without increasing the bucket count. Buckets are simply mapped to the ones in the new scale containing the point of least relative error of the original buckets. + +This can introduce a small error, as the original center might be moved slightly. Therefore, we ensure that the upscaling happens at most once to prevent errors from adding up. The higher the amount of upscaling, the less the error (higher scale means smaller buckets, which in turn means we get a better fit around the original point of least relative error). + +## Distributions with Few Distinct Values + +The sparse storage model only requires memory linear to the total number of buckets, while dense storage needs to store the entire range of the smallest and biggest buckets. + +This offers significant benefits for distributions with fewer distinct values: +If we have at least as many buckets as we have distinct values to store in the histogram, we can represent this distribution with a much smaller error than the dense representation. +This can be achieved by maintaining the scale at the maximum supported value (so the buckets become the smallest). +At the time of writing, the maximum scale is 38, so the relative distance between the lower and upper bucket boundaries is (2^2^(-38)). + +The impact of the error is best shown with a concrete example: +If we store, for example, a duration value of 10^15 nanoseconds (= roughly 11.5 days), this value will be stored in a bucket that guarantees a relative error of at most 2^2^(-38), so roughly 2.5 microseconds in this case. +As long as the number of values we insert is lower than the bucket count, we are guaranteed that no down-scaling happens: In contrast to dense storage, the scale does not depend on the spread between the smallest and largest bucket index. + +To clarify the difference between dense and sparse storage, let's assume that we have an empty histogram and the maximum scale is zero while the maximum bucket count is four. +The same logic applies to higher scales and bucket counts, but we use these values to get easier numbers for this example. +The scale of zero means that our bucket boundaries are `1, 2, 4, 8, 16, 32, 64, 128, 256, ...`. +We now want to insert the value `6` into the histogram. The dense storage works by storing an array for the bucket counts plus an initial offset. +This means that the first slot in the bucket counts array corresponds to the bucket with index `offset` and the last one to `offset + bucketCounts.length - 1`. +So if we add the value `6` to the histogram, it falls into the `(4,8]` bucket, which has the index `2`. + +So our dense histogram looks like this: +``` +offset = 2 +bucketCounts = [1, 0, 0, 0] // represent bucket counts for bucket index 2 to 5 +``` + +If we now insert the value `20` (`(16,32]`, bucket index 4), everything is still fine: +``` +offset = 2 +bucketCounts = [1, 0, 1, 0] // represent bucket counts for bucket index 2 to 5 +``` + +However, we run into trouble if we insert the value `100`, which corresponds to index 6: That index is outside of the bounds of our array. +We can't just increase the `offset`, because the first bucket in our array is populated too. +We have no other option other than decreasing the scale of the histogram, to make sure that our values `6` and `100` fall in the range of four **consecutive** buckets due to the bucket count limit of the dense storage. + +In contrast, a sparse histogram has no trouble storing this data while keeping the scale of zero: +``` +bucketIndiciesToCounts: { + "2" : 1, + "4" : 1, + "6" : 1 +} +``` + +Downscaling on the sparse representation only happens if either: + * The number of populated buckets would become bigger than our maximum bucket count. We have to downscale to make neighboring, populated buckets combine to a single bucket until we are below our limit again. + * The highest or smallest indices require more bits to store than we allow. This does not happen in our implementation for normal inputs, because we allow up to 62 bits for index storage, which fits the entire numeric range of IEEE 754 double precision floats at our maximum scale. + +### Handling Explicit Bucket Histograms + +We can make use of this property to convert explicit bucket histograms (https://opentelemetry.io/docs/specs/otel/metrics/data-model/#histogram) to exponential ones by again assuming that all values in a bucket lie in a single point: + * For each explicit bucket, we take its point of least relative error and add it to the corresponding exponential histogram bucket with the corresponding count. + * The open, upper, and lower buckets, including infinity, will need special treatment, but these are not useful for percentile estimates anyway. + +This gives us a great solution for universally dealing with histograms: +When merging exponential histograms generated from explicit ones, the scale is not decreased (and therefore the error not increased) as long as the number of distinct buckets from the original explicit bucket histograms does not exceed the exponential histogram bucket count. As a result, the computed percentiles will be precise with only the [relative error of the initial conversion](#distributions-with-few-distinct-values). +In addition, this allows us to compute percentiles on mixed explicit bucket histograms or even mix them with exponential ones by just using the exponential histogram algorithms. diff --git a/libs/exponential-histogram/build.gradle b/libs/exponential-histogram/build.gradle new file mode 100644 index 0000000000000..6451931297597 --- /dev/null +++ b/libs/exponential-histogram/build.gradle @@ -0,0 +1,23 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +// TODO: publish this when ready? +//apply plugin: 'elasticsearch.publish' +apply plugin: 'elasticsearch.build' + +dependencies { + testImplementation(project(":test:framework")) + testImplementation('ch.obermuhlner:big-math:2.3.2') + testImplementation('org.apache.commons:commons-math3:3.6.1') +} + +tasks.named('forbiddenApisMain').configure { + // this lib does not depend on core, so only jdk signatures should be checked + replaceSignatureFiles 'jdk-signatures' +} diff --git a/libs/exponential-histogram/src/main/java/org/elasticsearch/exponentialhistogram/BucketIterator.java b/libs/exponential-histogram/src/main/java/org/elasticsearch/exponentialhistogram/BucketIterator.java new file mode 100644 index 0000000000000..f581e00a0bbc3 --- /dev/null +++ b/libs/exponential-histogram/src/main/java/org/elasticsearch/exponentialhistogram/BucketIterator.java @@ -0,0 +1,61 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.exponentialhistogram; + +/** + * An iterator over the non-empty buckets of the histogram for either the positive or negative range. + *
    + *
  • The iterator always iterates from the lowest bucket index to the highest.
  • + *
  • The iterator never returns duplicate buckets (buckets with the same index).
  • + *
  • The iterator never returns empty buckets ({@link #peekCount()} is never zero).
  • + *
+ */ +public interface BucketIterator { + /** + * Checks if there are any buckets remaining to be visited by this iterator. + * If the end has been reached, it is illegal to call {@link #peekCount()}, {@link #peekIndex()}, or {@link #advance()}. + * + * @return {@code true} if the iterator has more elements, {@code false} otherwise + */ + boolean hasNext(); + + /** + * The number of items in the bucket at the current iterator position. Does not advance the iterator. + * Must not be called if {@link #hasNext()} returns {@code false}. + * + * @return the number of items in the bucket, always greater than zero + */ + long peekCount(); + + /** + * The index of the bucket at the current iterator position. Does not advance the iterator. + * Must not be called if {@link #hasNext()} returns {@code false}. + * + * @return the index of the bucket, guaranteed to be in the range + * [{@link ExponentialHistogram#MIN_INDEX}, + * {@link ExponentialHistogram#MAX_INDEX}] + */ + long peekIndex(); + + /** + * Moves the iterator to the next, non-empty bucket. + * If {@link #hasNext()} is {@code true} after calling {@link #advance()}, {@link #peekIndex()} is guaranteed to return a value + * greater than the value returned prior to the {@link #advance()} call. + */ + void advance(); + + /** + * Provides the scale that can be used to convert indices returned by {@link #peekIndex()} to the bucket boundaries, + * e.g., via {@link ExponentialScaleUtils#getLowerBucketBoundary(long, int)}. + * + * @return the scale, which is guaranteed to be constant over the lifetime of this iterator + */ + int scale(); +} diff --git a/libs/exponential-histogram/src/main/java/org/elasticsearch/exponentialhistogram/CopyableBucketIterator.java b/libs/exponential-histogram/src/main/java/org/elasticsearch/exponentialhistogram/CopyableBucketIterator.java new file mode 100644 index 0000000000000..196a44b25d861 --- /dev/null +++ b/libs/exponential-histogram/src/main/java/org/elasticsearch/exponentialhistogram/CopyableBucketIterator.java @@ -0,0 +1,24 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.exponentialhistogram; + +/** + * A {@link BucketIterator} that can be copied. + */ +public interface CopyableBucketIterator extends BucketIterator { + + /** + * Creates a copy of this bucket iterator, pointing at the same bucket of the same range of buckets. + * Calling {@link #advance()} on the copied iterator does not affect this instance and vice-versa. + * + * @return a copy of this iterator + */ + CopyableBucketIterator copy(); +} diff --git a/libs/exponential-histogram/src/main/java/org/elasticsearch/exponentialhistogram/DownscaleStats.java b/libs/exponential-histogram/src/main/java/org/elasticsearch/exponentialhistogram/DownscaleStats.java new file mode 100644 index 0000000000000..23eebbddd1d58 --- /dev/null +++ b/libs/exponential-histogram/src/main/java/org/elasticsearch/exponentialhistogram/DownscaleStats.java @@ -0,0 +1,110 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.exponentialhistogram; + +import java.util.Arrays; + +import static org.elasticsearch.exponentialhistogram.ExponentialHistogram.MAX_INDEX; +import static org.elasticsearch.exponentialhistogram.ExponentialHistogram.MAX_INDEX_BITS; +import static org.elasticsearch.exponentialhistogram.ExponentialHistogram.MIN_INDEX; + +/** + * A data structure for efficiently computing the required scale reduction for a histogram to reach a target number of buckets. + * This works by examining pairs of neighboring buckets and determining at which scale reduction they would merge into a single bucket. + */ +class DownscaleStats { + + // collapsedBucketCount[i] stores the number of additional + // collapsed buckets when increasing the scale by (i+1) instead of just by (i) + int[] collapsedBucketCount = new int[MAX_INDEX_BITS]; + + /** + * Resets the data structure to its initial state. + */ + void reset() { + Arrays.fill(collapsedBucketCount, 0); + } + + /** + * Adds a pair of neighboring bucket indices to track for potential merging. + * + * @param previousBucketIndex the index of the previous bucket + * @param currentBucketIndex the index of the current bucket + */ + void add(long previousBucketIndex, long currentBucketIndex) { + if (currentBucketIndex <= previousBucketIndex) { + throw new IllegalArgumentException("currentBucketIndex must be greater than previousBucketIndex"); + } + if (currentBucketIndex < MIN_INDEX || currentBucketIndex > MAX_INDEX) { + throw new IllegalArgumentException("currentBucketIndex must be in the range [" + MIN_INDEX + "..." + MAX_INDEX + "]"); + } + if (previousBucketIndex < MIN_INDEX || previousBucketIndex > MAX_INDEX) { + throw new IllegalArgumentException("previousBucketIndex must be in the range [" + MIN_INDEX + "..." + MAX_INDEX + "]"); + } + /* + * Below is an efficient variant of the following algorithm: + * for (int i=0; i<63; i++) { + * if (prevIndex>>(i+1) == currIndex>>(i+1)) { + * collapsedBucketCount[i]++; + * break; + * } + * } + * So we find the smallest scale reduction required to make the two buckets collapse into one. + */ + long bitXor = previousBucketIndex ^ currentBucketIndex; + int numEqualLeadingBits = Long.numberOfLeadingZeros(bitXor); + // if there are zero equal leading bits, the indices have a different sign. + // Therefore right-shifting will never make the buckets combine + if (numEqualLeadingBits > 0) { + int requiredScaleChange = 64 - numEqualLeadingBits; + collapsedBucketCount[requiredScaleChange - 1]++; + } + } + + /** + * Returns the number of buckets that will be merged after applying the given scale reduction. + * + * @param reduction the scale reduction factor + * @return the number of buckets that will be merged + */ + int getCollapsedBucketCountAfterScaleReduction(int reduction) { + if (reduction < 0 || reduction > MAX_INDEX_BITS) { + throw new IllegalArgumentException("reduction must be between 0 and " + (MAX_INDEX_BITS)); + } + int totalCollapsed = 0; + for (int i = 0; i < reduction; i++) { + totalCollapsed += collapsedBucketCount[i]; + } + return totalCollapsed; + } + + /** + * Returns the required scale reduction to reduce the number of buckets by at least the given amount. + * + * @param desiredCollapsedBucketCount the target number of buckets to collapse + * @return the required scale reduction + */ + int getRequiredScaleReductionToReduceBucketCountBy(int desiredCollapsedBucketCount) { + if (desiredCollapsedBucketCount < 0) { + throw new IllegalArgumentException("desiredCollapsedBucketCount must be greater than or equal to 0"); + } + if (desiredCollapsedBucketCount == 0) { + return 0; + } + int totalCollapsed = 0; + for (int i = 0; i < collapsedBucketCount.length; i++) { + totalCollapsed += collapsedBucketCount[i]; + if (totalCollapsed >= desiredCollapsedBucketCount) { + return i + 1; + } + } + throw new IllegalArgumentException("Cannot reduce the bucket count by " + desiredCollapsedBucketCount); + } +} diff --git a/libs/exponential-histogram/src/main/java/org/elasticsearch/exponentialhistogram/ExponentialHistogram.java b/libs/exponential-histogram/src/main/java/org/elasticsearch/exponentialhistogram/ExponentialHistogram.java new file mode 100644 index 0000000000000..355d344fa4713 --- /dev/null +++ b/libs/exponential-histogram/src/main/java/org/elasticsearch/exponentialhistogram/ExponentialHistogram.java @@ -0,0 +1,103 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.exponentialhistogram; + +import java.util.OptionalLong; + +/** + * Interface for implementations of exponential histograms adhering to the + * OpenTelemetry definition. + * This interface supports sparse implementations, allowing iteration over buckets without requiring direct index access.
+ * The most important properties are: + *
    + *
  • The histogram has a scale parameter, which defines the accuracy. A higher scale implies a higher accuracy. + * The {@code base} for the buckets is defined as {@code base = 2^(2^-scale)}.
  • + *
  • The histogram bucket at index {@code i} has the range {@code (base^i, base^(i+1)]}
  • + *
  • Negative values are represented by a separate negative range of buckets with the boundaries {@code (-base^(i+1), -base^i]}
  • + *
  • Histograms are perfectly subsetting: increasing the scale by one merges each pair of neighboring buckets
  • + *
  • A special {@link ZeroBucket} is used to handle zero and close-to-zero values
  • + *
+ * + *
+ * Additionally, all algorithms assume that samples within a bucket are located at a single point: the point of least relative error + * (see {@link ExponentialScaleUtils#getPointOfLeastRelativeError(long, int)}). + */ +public interface ExponentialHistogram { + + // TODO(b/128622): support min/max/sum/count storage and merging. + // TODO(b/128622): Add special positive and negative infinity buckets + // to allow representation of explicit bucket histograms with open boundaries. + + // A scale of 38 is the largest scale where we don't run into problems at the borders due to floating-point precision when computing + // indices for double values. + // Theoretically, a MAX_SCALE of 51 would work and would still cover the entire range of double values. + // For that to work, the math for converting from double to indices and back would need to be reworked. + // One option would be to use "Quadruple": https://github.com/m-vokhm/Quadruple + int MAX_SCALE = 38; + + // At this scale, all double values fall into a single bucket. + int MIN_SCALE = -11; + + // Only use 62 bits (plus the sign bit) at max to allow computing the difference between the smallest and largest index without causing + // an overflow. + // The extra bit also provides room for compact storage tricks. + int MAX_INDEX_BITS = 62; + long MAX_INDEX = (1L << MAX_INDEX_BITS) - 1; + long MIN_INDEX = -MAX_INDEX; + + /** + * The scale of the histogram. Higher scales result in higher accuracy but potentially more buckets. + * Must be less than or equal to {@link #MAX_SCALE} and greater than or equal to {@link #MIN_SCALE}. + * + * @return the scale of the histogram + */ + int scale(); + + /** + * @return the {@link ZeroBucket} representing the number of zero (or close-to-zero) values and its threshold + */ + ZeroBucket zeroBucket(); + + /** + * @return a {@link Buckets} instance for the populated buckets covering the positive value range of this histogram. + * The {@link BucketIterator#scale()} of iterators obtained via {@link Buckets#iterator()} must be the same as {@link #scale()}. + */ + Buckets positiveBuckets(); + + /** + * @return a {@link Buckets} instance for the populated buckets covering the negative value range of this histogram. + * The {@link BucketIterator#scale()} of iterators obtained via {@link Buckets#iterator()} must be the same as {@link #scale()}. + */ + Buckets negativeBuckets(); + + /** + * Represents a bucket range of an {@link ExponentialHistogram}, either the positive or the negative range. + */ + interface Buckets { + + /** + * @return a {@link BucketIterator} for the populated buckets of this bucket range. + * The {@link BucketIterator#scale()} of the returned iterator must be the same as {@link #scale()}. + */ + CopyableBucketIterator iterator(); + + /** + * @return the highest populated bucket index, or an empty optional if no buckets are populated + */ + OptionalLong maxBucketIndex(); + + /** + * @return the sum of the counts across all buckets of this range + */ + long valueCount(); + + } + +} diff --git a/libs/exponential-histogram/src/main/java/org/elasticsearch/exponentialhistogram/ExponentialHistogramGenerator.java b/libs/exponential-histogram/src/main/java/org/elasticsearch/exponentialhistogram/ExponentialHistogramGenerator.java new file mode 100644 index 0000000000000..296b060f0d23f --- /dev/null +++ b/libs/exponential-histogram/src/main/java/org/elasticsearch/exponentialhistogram/ExponentialHistogramGenerator.java @@ -0,0 +1,150 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.exponentialhistogram; + +import java.util.Arrays; + +import static org.elasticsearch.exponentialhistogram.ExponentialScaleUtils.computeIndex; + +/** + * A class for accumulating raw values into an {@link ExponentialHistogram} with a given maximum number of buckets. + * + * If the number of values is less than or equal to the bucket capacity, the resulting histogram is guaranteed + * to represent the exact raw values with a relative error less than {@code 2^(2^-MAX_SCALE) - 1}. + */ +public class ExponentialHistogramGenerator { + + // Merging individual values into a histogram would be way too slow with our sparse, array-backed histogram representation. + // Therefore, for a bucket capacity of c, we first buffer c raw values to be inserted. + // We then turn those into an "exact" histogram, which in turn we merge with our actual result accumulator. + // This yields an amortized runtime of O(log(c)). + private final double[] rawValueBuffer; + int valueCount; + + private final ExponentialHistogramMerger resultMerger; + private final FixedCapacityExponentialHistogram valueBuffer; + + private boolean isFinished = false; + + /** + * Creates a new instance with the specified maximum number of buckets. + * + * @param maxBucketCount the maximum number of buckets for the generated histogram + */ + public ExponentialHistogramGenerator(int maxBucketCount) { + rawValueBuffer = new double[maxBucketCount]; + valueCount = 0; + valueBuffer = new FixedCapacityExponentialHistogram(maxBucketCount); + resultMerger = new ExponentialHistogramMerger(maxBucketCount); + } + + /** + * Adds the given value to the histogram. + * Must not be called after {@link #get()} has been called. + * + * @param value the value to add + */ + public void add(double value) { + if (isFinished) { + throw new IllegalStateException("get() has already been called"); + } + if (valueCount == rawValueBuffer.length) { + mergeValuesToHistogram(); + } + rawValueBuffer[valueCount] = value; + valueCount++; + } + + /** + * Returns the histogram representing the distribution of all accumulated values. + * + * @return the histogram representing the distribution of all accumulated values + */ + public ExponentialHistogram get() { + isFinished = true; + mergeValuesToHistogram(); + return resultMerger.get(); + } + + /** + * Creates a histogram representing the distribution of the given values. + * The histogram will have a bucket count of at most the length of the provided array + * and will have a relative error less than {@code 2^(2^-MAX_SCALE) - 1}. + * + * @param values the values to be added to the histogram + * @return a new {@link ExponentialHistogram} + */ + public static ExponentialHistogram createFor(double... values) { + return createFor(values.length, values); + } + + /** + * Creates a histogram representing the distribution of the given values with at most the given number of buckets. + * If the given bucketCount is greater than or equal to the number of values, the resulting histogram will have a + * relative error of less than {@code 2^(2^-MAX_SCALE) - 1}. + * + * @param bucketCount the maximum number of buckets + * @param values the values to be added to the histogram + * @return a new {@link ExponentialHistogram} + */ + public static ExponentialHistogram createFor(int bucketCount, double... values) { + ExponentialHistogramGenerator generator = new ExponentialHistogramGenerator(bucketCount); + for (double val : values) { + generator.add(val); + } + return generator.get(); + } + + private void mergeValuesToHistogram() { + if (valueCount == 0) { + return; + } + Arrays.sort(rawValueBuffer, 0, valueCount); + int negativeValuesCount = 0; + while (negativeValuesCount < valueCount && rawValueBuffer[negativeValuesCount] < 0) { + negativeValuesCount++; + } + + valueBuffer.reset(); + int scale = valueBuffer.scale(); + + // Buckets must be provided in ascending index-order + // for the negative range, smaller bigger correspond to -INF and smaller ones closer to zero + // therefore we have to iterate the negative values in reverse order, from the value closest to -INF to the value closest to zero + for (int i = negativeValuesCount - 1; i >= 0; i--) { + long count = 1; + long index = computeIndex(rawValueBuffer[i], scale); + while ((i - 1) >= 0 && computeIndex(rawValueBuffer[i - 1], scale) == index) { + i--; + count++; + } + valueBuffer.tryAddBucket(index, count, false); + } + + int zeroCount = 0; + while ((negativeValuesCount + zeroCount) < valueCount && rawValueBuffer[negativeValuesCount + zeroCount] == 0) { + zeroCount++; + } + valueBuffer.setZeroBucket(ZeroBucket.minimalWithCount(zeroCount)); + for (int i = negativeValuesCount + zeroCount; i < valueCount; i++) { + long count = 1; + long index = computeIndex(rawValueBuffer[i], scale); + while ((i + 1) < valueCount && computeIndex(rawValueBuffer[i + 1], scale) == index) { + i++; + count++; + } + valueBuffer.tryAddBucket(index, count, true); + } + + resultMerger.add(valueBuffer); + valueCount = 0; + } + +} diff --git a/libs/exponential-histogram/src/main/java/org/elasticsearch/exponentialhistogram/ExponentialHistogramMerger.java b/libs/exponential-histogram/src/main/java/org/elasticsearch/exponentialhistogram/ExponentialHistogramMerger.java new file mode 100644 index 0000000000000..e97c16add4049 --- /dev/null +++ b/libs/exponential-histogram/src/main/java/org/elasticsearch/exponentialhistogram/ExponentialHistogramMerger.java @@ -0,0 +1,170 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.exponentialhistogram; + +import static org.elasticsearch.exponentialhistogram.ExponentialScaleUtils.getMaximumScaleIncrease; + +/** + * Allows accumulating multiple {@link ExponentialHistogram} into a single one + * while keeping the bucket count in the result below a given limit. + */ +public class ExponentialHistogramMerger { + + // Our algorithm is not in-place, therefore we use two histograms and ping-pong between them + private FixedCapacityExponentialHistogram result; + private FixedCapacityExponentialHistogram buffer; + + private final DownscaleStats downscaleStats; + + private boolean isFinished; + + /** + * Creates a new instance with the specified bucket limit. + * + * @param bucketLimit the maximum number of buckets the result histogram is allowed to have + */ + public ExponentialHistogramMerger(int bucketLimit) { + downscaleStats = new DownscaleStats(); + result = new FixedCapacityExponentialHistogram(bucketLimit); + buffer = new FixedCapacityExponentialHistogram(bucketLimit); + } + + // Only intended for testing, using this in production means an unnecessary reduction of precision + private ExponentialHistogramMerger(int bucketLimit, int minScale) { + this(bucketLimit); + result.resetBuckets(minScale); + buffer.resetBuckets(minScale); + } + + static ExponentialHistogramMerger createForTesting(int bucketLimit, int minScale) { + return new ExponentialHistogramMerger(bucketLimit, minScale); + } + + /** + * Merges the given histogram into the current result. + * Must not be called after {@link #get()} has been called. + * + * @param toAdd the histogram to merge + */ + public void add(ExponentialHistogram toAdd) { + if (isFinished) { + throw new IllegalStateException("get() has already been called"); + } + doMerge(toAdd); + } + + /** + * Returns the merged histogram. + * + * @return the merged histogram + */ + public ExponentialHistogram get() { + isFinished = true; + return result; + } + + // TODO(b/128622): this algorithm is very efficient if b has roughly as many buckets as a + // However, if b is much smaller we still have to iterate over all buckets of a which is very wasteful. + // This can be optimized by buffering multiple histograms to accumulate first, + // then in O(log(n)) turn them into a single, merged histogram. + // (n is the number of buffered buckets) + + private void doMerge(ExponentialHistogram b) { + + ExponentialHistogram a = result; + + CopyableBucketIterator posBucketsA = a.positiveBuckets().iterator(); + CopyableBucketIterator negBucketsA = a.negativeBuckets().iterator(); + CopyableBucketIterator posBucketsB = b.positiveBuckets().iterator(); + CopyableBucketIterator negBucketsB = b.negativeBuckets().iterator(); + + ZeroBucket zeroBucket = a.zeroBucket().merge(b.zeroBucket()); + zeroBucket = zeroBucket.collapseOverlappingBuckets(posBucketsA, negBucketsA, posBucketsB, negBucketsB); + + buffer.setZeroBucket(zeroBucket); + + // We attempt to bring everything to the scale of A. + // This might involve increasing the scale for B, which would increase its indices. + // We need to ensure that we do not exceed MAX_INDEX / MIN_INDEX in this case. + int targetScale = a.scale(); + if (targetScale > b.scale()) { + if (negBucketsB.hasNext()) { + long smallestIndex = negBucketsB.peekIndex(); + long highestIndex = b.negativeBuckets().maxBucketIndex().getAsLong(); + int maxScaleIncrease = Math.min(getMaximumScaleIncrease(smallestIndex), getMaximumScaleIncrease(highestIndex)); + targetScale = Math.min(targetScale, b.scale() + maxScaleIncrease); + } + if (posBucketsB.hasNext()) { + long smallestIndex = posBucketsB.peekIndex(); + long highestIndex = b.positiveBuckets().maxBucketIndex().getAsLong(); + int maxScaleIncrease = Math.min(getMaximumScaleIncrease(smallestIndex), getMaximumScaleIncrease(highestIndex)); + targetScale = Math.min(targetScale, b.scale() + maxScaleIncrease); + } + } + + // Now we are sure that everything fits numerically into targetScale. + // However, we might exceed our limit for the total number of buckets. + // Therefore, we try the merge optimistically. If we fail, we reduce the target scale to make everything fit. + + MergingBucketIterator positiveMerged = new MergingBucketIterator(posBucketsA.copy(), posBucketsB.copy(), targetScale); + MergingBucketIterator negativeMerged = new MergingBucketIterator(negBucketsA.copy(), negBucketsB.copy(), targetScale); + + buffer.resetBuckets(targetScale); + downscaleStats.reset(); + int overflowCount = putBuckets(buffer, negativeMerged, false, downscaleStats); + overflowCount += putBuckets(buffer, positiveMerged, true, downscaleStats); + + if (overflowCount > 0) { + // UDD-sketch approach: decrease the scale and retry. + int reduction = downscaleStats.getRequiredScaleReductionToReduceBucketCountBy(overflowCount); + targetScale -= reduction; + buffer.resetBuckets(targetScale); + positiveMerged = new MergingBucketIterator(posBucketsA, posBucketsB, targetScale); + negativeMerged = new MergingBucketIterator(negBucketsA, negBucketsB, targetScale); + overflowCount = putBuckets(buffer, negativeMerged, false, null); + overflowCount += putBuckets(buffer, positiveMerged, true, null); + + if (overflowCount > 0) { + throw new IllegalStateException("Should never happen, the histogram should have had enough space"); + } + } + FixedCapacityExponentialHistogram temp = result; + result = buffer; + buffer = temp; + } + + private static int putBuckets( + FixedCapacityExponentialHistogram output, + BucketIterator buckets, + boolean isPositive, + DownscaleStats downscaleStats + ) { + boolean collectDownScaleStatsOnNext = false; + long prevIndex = 0; + int overflowCount = 0; + while (buckets.hasNext()) { + long idx = buckets.peekIndex(); + if (collectDownScaleStatsOnNext) { + downscaleStats.add(prevIndex, idx); + } else { + collectDownScaleStatsOnNext = downscaleStats != null; + } + + if (output.tryAddBucket(idx, buckets.peekCount(), isPositive) == false) { + overflowCount++; + } + + prevIndex = idx; + buckets.advance(); + } + return overflowCount; + } + +} diff --git a/libs/exponential-histogram/src/main/java/org/elasticsearch/exponentialhistogram/ExponentialHistogramQuantile.java b/libs/exponential-histogram/src/main/java/org/elasticsearch/exponentialhistogram/ExponentialHistogramQuantile.java new file mode 100644 index 0000000000000..c15de67d4558e --- /dev/null +++ b/libs/exponential-histogram/src/main/java/org/elasticsearch/exponentialhistogram/ExponentialHistogramQuantile.java @@ -0,0 +1,147 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.exponentialhistogram; + +import java.util.OptionalLong; + +/** + * Provides quantile estimation for {@link ExponentialHistogram} instances. + */ +public class ExponentialHistogramQuantile { + + /** + * Estimates a quantile for the distribution represented by the given histogram. + * + * It returns the value of the element at rank {@code max(0, min(n - 1, (quantile * (n + 1)) - 1))}, where n is the total number of + * values and rank starts at 0. If the rank is fractional, the result is linearly interpolated from the values of the two + * neighboring ranks. + * + * @param histo the histogram representing the distribution + * @param quantile the quantile to query, in the range [0, 1] + * @return the estimated quantile value, or {@link Double#NaN} if the histogram is empty + */ + public static double getQuantile(ExponentialHistogram histo, double quantile) { + if (quantile < 0 || quantile > 1) { + throw new IllegalArgumentException("quantile must be in range [0, 1]"); + } + + long zeroCount = histo.zeroBucket().count(); + long negCount = histo.negativeBuckets().valueCount(); + long posCount = histo.positiveBuckets().valueCount(); + + long totalCount = zeroCount + negCount + posCount; + if (totalCount == 0) { + // Can't compute quantile on an empty histogram + return Double.NaN; + } + + double exactRank = quantile * (totalCount - 1); + long lowerRank = (long) Math.floor(exactRank); + long upperRank = (long) Math.ceil(exactRank); + double upperFactor = exactRank - lowerRank; + + ValueAndPreviousValue values = getElementAtRank(histo, upperRank); + + if (lowerRank == upperRank) { + return values.valueAtRank(); + } else { + return values.valueAtPreviousRank() * (1 - upperFactor) + values.valueAtRank() * upperFactor; + } + } + + /** + * @param valueAtPreviousRank the value at the rank before the desired rank, NaN if not applicable. + * @param valueAtRank the value at the desired rank + */ + private record ValueAndPreviousValue(double valueAtPreviousRank, double valueAtRank) { + ValueAndPreviousValue negateAndSwap() { + return new ValueAndPreviousValue(-valueAtRank, -valueAtPreviousRank); + } + } + + private static ValueAndPreviousValue getElementAtRank(ExponentialHistogram histo, long rank) { + long negativeValuesCount = histo.negativeBuckets().valueCount(); + long zeroCount = histo.zeroBucket().count(); + if (rank < negativeValuesCount) { + if (rank == 0) { + return new ValueAndPreviousValue(Double.NaN, -getLastBucketMidpoint(histo.negativeBuckets())); + } else { + return getBucketMidpointForRank(histo.negativeBuckets().iterator(), negativeValuesCount - rank).negateAndSwap(); + } + } else if (rank < (negativeValuesCount + zeroCount)) { + if (rank == negativeValuesCount) { + // the element at the previous rank falls into the negative bucket range + return new ValueAndPreviousValue(-getFirstBucketMidpoint(histo.negativeBuckets()), 0.0); + } else { + return new ValueAndPreviousValue(0.0, 0.0); + } + } else { + ValueAndPreviousValue result = getBucketMidpointForRank( + histo.positiveBuckets().iterator(), + rank - negativeValuesCount - zeroCount + ); + if ((rank - 1) < negativeValuesCount) { + // previous value falls into the negative bucket range or has rank -1 and therefore doesn't exist + return new ValueAndPreviousValue(-getFirstBucketMidpoint(histo.negativeBuckets()), result.valueAtRank); + } else if ((rank - 1) < (negativeValuesCount + zeroCount)) { + // previous value falls into the zero bucket + return new ValueAndPreviousValue(0.0, result.valueAtRank); + } else { + return result; + } + } + } + + private static double getFirstBucketMidpoint(ExponentialHistogram.Buckets buckets) { + CopyableBucketIterator iterator = buckets.iterator(); + if (iterator.hasNext()) { + return ExponentialScaleUtils.getPointOfLeastRelativeError(iterator.peekIndex(), iterator.scale()); + } else { + return Double.NaN; + } + } + + private static double getLastBucketMidpoint(ExponentialHistogram.Buckets buckets) { + OptionalLong highestIndex = buckets.maxBucketIndex(); + if (highestIndex.isPresent()) { + return ExponentialScaleUtils.getPointOfLeastRelativeError(highestIndex.getAsLong(), buckets.iterator().scale()); + } else { + return Double.NaN; + } + } + + private static ValueAndPreviousValue getBucketMidpointForRank(BucketIterator buckets, long rank) { + long prevIndex = Long.MIN_VALUE; + long seenCount = 0; + while (buckets.hasNext()) { + seenCount += buckets.peekCount(); + if (rank < seenCount) { + double center = ExponentialScaleUtils.getPointOfLeastRelativeError(buckets.peekIndex(), buckets.scale()); + double prevCenter; + if (rank > 0) { + if ((rank - 1) >= (seenCount - buckets.peekCount())) { + // element at previous rank is in same bucket + prevCenter = center; + } else { + // element at previous rank is in the previous bucket + prevCenter = ExponentialScaleUtils.getPointOfLeastRelativeError(prevIndex, buckets.scale()); + } + } else { + // there is no previous element + prevCenter = Double.NaN; + } + return new ValueAndPreviousValue(prevCenter, center); + } + prevIndex = buckets.peekIndex(); + buckets.advance(); + } + throw new IllegalStateException("The total number of elements in the buckets is less than the desired rank."); + } +} diff --git a/libs/exponential-histogram/src/main/java/org/elasticsearch/exponentialhistogram/ExponentialScaleUtils.java b/libs/exponential-histogram/src/main/java/org/elasticsearch/exponentialhistogram/ExponentialScaleUtils.java new file mode 100644 index 0000000000000..0c4c8e66fd8b1 --- /dev/null +++ b/libs/exponential-histogram/src/main/java/org/elasticsearch/exponentialhistogram/ExponentialScaleUtils.java @@ -0,0 +1,337 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.exponentialhistogram; + +import static org.elasticsearch.exponentialhistogram.ExponentialHistogram.MAX_INDEX; +import static org.elasticsearch.exponentialhistogram.ExponentialHistogram.MAX_INDEX_BITS; +import static org.elasticsearch.exponentialhistogram.ExponentialHistogram.MAX_SCALE; +import static org.elasticsearch.exponentialhistogram.ExponentialHistogram.MIN_INDEX; +import static org.elasticsearch.exponentialhistogram.ExponentialHistogram.MIN_SCALE; + +/** + * A collection of utility methods for working with indices and scales of exponential bucket histograms. + */ +public class ExponentialScaleUtils { + + private static final double LN_2 = Math.log(2); + + /** + * This table is visible for testing to ensure it is up-to-date. + *
+ * For each scale from {@link ExponentialHistogram#MIN_SCALE} to {@link ExponentialHistogram#MAX_SCALE}, + * the table contains a pre-computed constant for up-scaling bucket indices. + * The constant is computed using the following formula: + * {@code 2^63 * (1 + 2^scale * (1 - log2(1 + 2^(2^-scale))))} + */ + static final long[] SCALE_UP_CONSTANT_TABLE = new long[] { + 4503599627370495L, + 9007199254740991L, + 18014398509481983L, + 36028797018963967L, + 72057594037927935L, + 144115188075855871L, + 288230376054894118L, + 576448062320457790L, + 1146436840887505800L, + 2104167428150631728L, + 3127054724296373505L, + 3828045265094622256L, + 4214097751025163417L, + 4412149414858430624L, + 4511824212543271281L, + 4561743405547877994L, + 4586713247558758689L, + 4599199449917992829L, + 4605442711287634239L, + 4608564361996858084L, + 4610125189854540715L, + 4610905604096266504L, + 4611295811256239977L, + 4611490914841115537L, + 4611588466634164420L, + 4611637242530765249L, + 4611661630479075212L, + 4611673824453231387L, + 4611679921440309624L, + 4611682969933848761L, + 4611684494180618332L, + 4611685256304003118L, + 4611685637365695511L, + 4611685827896541707L, + 4611685923161964805L, + 4611685970794676354L, + 4611685994611032129L, + 4611686006519210016L, + 4611686012473298960L, + 4611686015450343432L, + 4611686016938865668L, + 4611686017683126786L, + 4611686018055257345L, + 4611686018241322624L, + 4611686018334355264L, + 4611686018380871584L, + 4611686018404129744L, + 4611686018415758824L, + 4611686018421573364L, + 4611686018424480634L }; + + /** + * Computes the new index for a bucket when adjusting the scale of the histogram. + * This method supports both down-scaling (reducing the scale) and up-scaling. + * When up-scaling, it returns the bucket containing the point of least error of the original bucket. + * + * @param index the current bucket index to be adjusted + * @param currentScale the current scale + * @param scaleAdjustment the adjustment to make; the new scale will be {@code currentScale + scaleAdjustment} + * @return the index of the bucket in the new scale + */ + static long adjustScale(long index, int currentScale, int scaleAdjustment) { + checkIndexAndScaleBounds(index, currentScale); + + int newScale = currentScale + scaleAdjustment; + if (newScale < MIN_SCALE || newScale > MAX_SCALE) { + throw new IllegalArgumentException("adjusted scale must be in the range [" + MIN_SCALE + "..." + MAX_SCALE + "]"); + } + + if (scaleAdjustment <= 0) { + return index >> -scaleAdjustment; + } else { + if (scaleAdjustment > MAX_INDEX_BITS) { + throw new IllegalArgumentException("Scaling up more than " + MAX_INDEX_BITS + " does not make sense"); + } + // When scaling up, we want to return the bucket containing the point of least relative error. + // This bucket index can be computed as (index << adjustment) + offset. + // The offset is a constant that depends only on the scale and adjustment, not the index. + // The mathematically correct formula for the offset is: + // 2^adjustment * (1 + 2^currentScale * (1 - log2(1 + 2^(2^-currentScale)))) + // This is hard to compute with double-precision floating-point numbers due to rounding errors and is also expensive. + // Therefore, we precompute 2^63 * (1 + 2^currentScale * (1 - log2(1 + 2^(2^-currentScale)))) and store it + // in SCALE_UP_CONSTANT_TABLE for each scale. + // This can then be converted to the correct offset by dividing with (2^(63-adjustment)), + // which is equivalent to a right shift with (63-adjustment) + long offset = SCALE_UP_CONSTANT_TABLE[currentScale - MIN_SCALE] >> (63 - scaleAdjustment); + return (index << scaleAdjustment) + offset; + } + } + + /** + * Compares the lower boundaries of two buckets, which may have different scales. + * This is equivalent to a mathematically correct comparison of the lower bucket boundaries. + * Note that this method allows for scales and indices of the full numeric range of the types. + * + * @param idxA the index of the first bucket + * @param scaleA the scale of the first bucket + * @param idxB the index of the second bucket + * @param scaleB the scale of the second bucket + * @return a negative integer, zero, or a positive integer as the first bucket's lower boundary is + * less than, equal to, or greater than the second bucket's lower boundary + */ + public static int compareExponentiallyScaledValues(long idxA, int scaleA, long idxB, int scaleB) { + if (scaleA > scaleB) { + return -compareExponentiallyScaledValues(idxB, scaleB, idxA, scaleA); + } + // scaleA <= scaleB + int shifts = scaleB - scaleA; + + long scaledDownB = idxB >> shifts; + int result = Long.compare(idxA, scaledDownB); + if (result == 0) { + // the scaled down values are equal + // this means that b is bigger if it has a "fractional" part, which corresponds to the bits that were removed on the right-shift + assert (1L << shifts) > 0; + long shiftedAway = idxB & ((1L << shifts) - 1); + if (shiftedAway > 0) { + return -1; + } else { + return 0; + } + } + return result; + } + + /** + * Returns the maximum permissible scale increase that does not cause the index to grow out + * of the [{@link ExponentialHistogram#MIN_INDEX}, {@link ExponentialHistogram#MIN_INDEX}] range. + * + * @param index the index to check + * @return the maximum permissible scale increase + */ + public static int getMaximumScaleIncrease(long index) { + checkIndexBounds(index); + // Scale increase by one corresponds to a left shift, which in turn is the same as multiplying by two. + // Because we know that MIN_INDEX = -MAX_INDEX, we can just compute the maximum increase of the absolute index. + // This allows us to reason only about non-negative indices further below. + index = Math.abs(index); + // the maximum scale increase is defined by how many left-shifts we can do without growing beyond MAX_INDEX + // MAX_INDEX is defined as a number where the left MAX_INDEX_BITS are all ones. + // So in other words, we must ensure that the leftmost (64 - MAX_INDEX_BITS) remain zero, + // which is exactly what the formula below does. + return Long.numberOfLeadingZeros(index) - (64 - MAX_INDEX_BITS); + } + + /** + * Returns the upper boundary of the bucket with the given index and scale. + * + * @param index the index of the bucket + * @param scale the scale of the bucket + * @return the upper boundary of the bucket + */ + public static double getUpperBucketBoundary(long index, int scale) { + checkIndexAndScaleBounds(index, scale); + return exponentiallyScaledToDoubleValue(index + 1, scale); + } + + /** + * Returns the lower boundary of the bucket with the given index and scale. + * + * @param index the index of the bucket in the [{@link ExponentialHistogram#MIN_INDEX}, {@link ExponentialHistogram#MAX_INDEX}] range. + * @param scale the scale of the bucket + * @return the lower boundary of the bucket + */ + public static double getLowerBucketBoundary(long index, int scale) { + checkIndexAndScaleBounds(index, scale); + return exponentiallyScaledToDoubleValue(index, scale); + } + + /** + * Computes (2^(2^-scale))^index, + * allowing also indices outside of the [{@link ExponentialHistogram#MIN_INDEX}, {@link ExponentialHistogram#MAX_INDEX}] range. + */ + static double exponentiallyScaledToDoubleValue(long index, int scale) { + // Math.exp is expected to be faster and more accurate than Math.pow + // For that reason we use (2^(2^-scale))^index = 2^( (2^-scale) * index) = (e^ln(2))^( (2^-scale) * index) + // = e^( ln(2) * (2^-scale) * index) + double inverseFactor = Math.scalb(LN_2, -scale); + return Math.exp(inverseFactor * index); + } + + /** + * For a bucket with the given index, computes the point {@code x} in the bucket such that + * {@code (x - l) / l} equals {@code (u - x) / u}, where {@code l} is the lower bucket boundary and {@code u} + * is the upper bucket boundary. + *
+ * In other words, we select the point in the bucket that has the least relative error with respect to any other point in the bucket. + * + * @param bucketIndex the index of the bucket + * @param scale the scale of the bucket + * @return the point of least relative error + */ + public static double getPointOfLeastRelativeError(long bucketIndex, int scale) { + checkIndexAndScaleBounds(bucketIndex, scale); + double upperBound = getUpperBucketBoundary(bucketIndex, scale); + double histogramBase = Math.pow(2, Math.scalb(1, -scale)); + return 2 / (histogramBase + 1) * upperBound; + } + + /** + * Provides the index of the bucket of the exponential histogram with the given scale that contains the provided value. + * + * @param value the value to find the bucket for + * @param scale the scale of the histogram + * @return the index of the bucket + */ + public static long computeIndex(double value, int scale) { + checkScaleBounds(scale); + return Indexing.computeIndex(value, scale); + } + + private static void checkIndexAndScaleBounds(long index, int scale) { + checkIndexBounds(index); + checkScaleBounds(scale); + } + + private static void checkScaleBounds(int scale) { + if (scale < MIN_SCALE || scale > MAX_SCALE) { + throw new IllegalArgumentException("scale must be in range [" + MIN_SCALE + ".." + MAX_SCALE + "]"); + } + } + + private static void checkIndexBounds(long index) { + if (index < MIN_INDEX || index > MAX_INDEX) { + throw new IllegalArgumentException("index must be in range [" + MIN_INDEX + ".." + MAX_INDEX + "]"); + } + } + + /** + * The code in this class was copied and slightly adapted from the + * OpenTelemetry Base2ExponentialHistogramIndexer implementation, + * licensed under the Apache License 2.0. + */ + private static class Indexing { + + /** Bit mask used to isolate exponent of IEEE 754 double precision number. */ + private static final long EXPONENT_BIT_MASK = 0x7FF0000000000000L; + + /** Bit mask used to isolate the significand of IEEE 754 double precision number. */ + private static final long SIGNIFICAND_BIT_MASK = 0xFFFFFFFFFFFFFL; + + /** Bias used in representing the exponent of IEEE 754 double precision number. */ + private static final int EXPONENT_BIAS = 1023; + + /** + * The number of bits used to represent the significand of IEEE 754 double precision number, + * excluding the implicit bit. + */ + private static final int SIGNIFICAND_WIDTH = 52; + + /** The number of bits used to represent the exponent of IEEE 754 double precision number. */ + private static final int EXPONENT_WIDTH = 11; + + private static final double LOG_BASE2_E = 1D / LN_2; + + static long computeIndex(double value, int scale) { + double absValue = Math.abs(value); + // For positive scales, compute the index by logarithm, which is simpler but may be + // inaccurate near bucket boundaries + if (scale > 0) { + return getIndexByLogarithm(absValue, scale); + } + // For scale zero, compute the exact index by extracting the exponent + if (scale == 0) { + return mapToIndexScaleZero(absValue); + } + // For negative scales, compute the exact index by extracting the exponent and shifting it to + // the right by -scale + return mapToIndexScaleZero(absValue) >> -scale; + } + + /** + * Compute the bucket index using a logarithm based approach. + * + * @see All + * Scales: Use the Logarithm Function + */ + private static long getIndexByLogarithm(double value, int scale) { + return (long) Math.ceil(Math.scalb(Math.log(value) * LOG_BASE2_E, scale)) - 1; + } + + /** + * Compute the exact bucket index for scale zero by extracting the exponent. + * + * @see Scale + * Zero: Extract the Exponent + */ + private static long mapToIndexScaleZero(double value) { + long rawBits = Double.doubleToLongBits(value); + long rawExponent = (rawBits & EXPONENT_BIT_MASK) >> SIGNIFICAND_WIDTH; + long rawSignificand = rawBits & SIGNIFICAND_BIT_MASK; + if (rawExponent == 0) { + rawExponent -= Long.numberOfLeadingZeros(rawSignificand - 1) - EXPONENT_WIDTH - 1; + } + int ieeeExponent = (int) (rawExponent - EXPONENT_BIAS); + if (rawSignificand == 0) { + return ieeeExponent - 1; + } + return ieeeExponent; + } + } +} diff --git a/libs/exponential-histogram/src/main/java/org/elasticsearch/exponentialhistogram/FixedCapacityExponentialHistogram.java b/libs/exponential-histogram/src/main/java/org/elasticsearch/exponentialhistogram/FixedCapacityExponentialHistogram.java new file mode 100644 index 0000000000000..d0618542984d9 --- /dev/null +++ b/libs/exponential-histogram/src/main/java/org/elasticsearch/exponentialhistogram/FixedCapacityExponentialHistogram.java @@ -0,0 +1,254 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.exponentialhistogram; + +import java.util.OptionalLong; + +/** + * An implementation of a mutable {@link ExponentialHistogram} with a sparse, array-backed representation. + *
+ * Consumers must ensure that if the histogram is mutated, all previously acquired {@link BucketIterator} + * instances are no longer used. + */ +final class FixedCapacityExponentialHistogram implements ExponentialHistogram { + + // These arrays represent both the positive and the negative buckets. + // They store all negative buckets first, in ascending index order, followed by all positive buckets, also in ascending index order. + private final long[] bucketIndices; + private final long[] bucketCounts; + + private int bucketScale; + + private final AbstractBuckets negativeBuckets = new AbstractBuckets() { + @Override + int startSlot() { + return 0; + } + }; + + private ZeroBucket zeroBucket; + + private final AbstractBuckets positiveBuckets = new AbstractBuckets() { + @Override + int startSlot() { + return negativeBuckets.numBuckets; + } + }; + + /** + * Creates an empty histogram with the given capacity and a {@link ZeroBucket#minimalEmpty()} zero bucket. + * The scale is initialized to the maximum possible precision ({@link #MAX_SCALE}). + * + * @param bucketCapacity the maximum total number of positive and negative buckets this histogram can hold. + */ + FixedCapacityExponentialHistogram(int bucketCapacity) { + bucketIndices = new long[bucketCapacity]; + bucketCounts = new long[bucketCapacity]; + reset(); + } + + /** + * Resets this histogram to the same state as a newly constructed one with the same capacity. + */ + public void reset() { + setZeroBucket(ZeroBucket.minimalEmpty()); + resetBuckets(MAX_SCALE); + } + + /** + * Removes all positive and negative buckets from this histogram and sets the scale to the given value. + * + * @param scale the scale to set for this histogram + */ + public void resetBuckets(int scale) { + if (scale > MAX_SCALE || scale < MIN_SCALE) { + throw new IllegalArgumentException("scale must be in range [" + MIN_SCALE + ".." + MAX_SCALE + "]"); + } + negativeBuckets.reset(); + positiveBuckets.reset(); + bucketScale = scale; + } + + @Override + public ZeroBucket zeroBucket() { + return zeroBucket; + } + + /** + * Replaces the zero bucket of this histogram with the given one. + * Callers must ensure that the given {@link ZeroBucket} does not + * overlap with any of the positive or negative buckets of this histogram. + * + * @param zeroBucket the zero bucket to set + */ + public void setZeroBucket(ZeroBucket zeroBucket) { + this.zeroBucket = zeroBucket; + } + + /** + * Attempts to add a bucket to the positive or negative range of this histogram. + *
+ * Callers must adhere to the following rules: + *
    + *
  • All buckets from the negative range must be provided before the first one from the positive range.
  • + *
  • For both the negative and positive ranges, buckets must be provided in ascending index order.
  • + *
  • It is not allowed to provide the same bucket more than once.
  • + *
  • It is not allowed to add empty buckets ({@code count <= 0}).
  • + *
+ * + * If any of these rules are violated, this call will fail with an exception. + * If the bucket cannot be added because the maximum capacity has been reached, the call will not modify the state + * of this histogram and will return {@code false}. + * + * @param index the index of the bucket to add + * @param count the count to associate with the given bucket + * @param isPositive {@code true} if the bucket belongs to the positive range, {@code false} if it belongs to the negative range + * @return {@code true} if the bucket was added, {@code false} if it could not be added due to insufficient capacity + */ + public boolean tryAddBucket(long index, long count, boolean isPositive) { + if (index < MIN_INDEX || index > MAX_INDEX) { + throw new IllegalArgumentException("index must be in range [" + MIN_INDEX + ".." + MAX_INDEX + "]"); + } + if (isPositive == false && positiveBuckets.numBuckets > 0) { + throw new IllegalArgumentException("Cannot add negative buckets after a positive bucket has been added"); + } + if (count <= 0) { + throw new IllegalArgumentException("Cannot add an empty or negative bucket"); + } + if (isPositive) { + return positiveBuckets.tryAddBucket(index, count); + } else { + return negativeBuckets.tryAddBucket(index, count); + } + } + + @Override + public int scale() { + return bucketScale; + } + + @Override + public Buckets negativeBuckets() { + return negativeBuckets; + } + + @Override + public Buckets positiveBuckets() { + return positiveBuckets; + } + + private abstract class AbstractBuckets implements Buckets { + + private int numBuckets; + private int cachedValueSumForNumBuckets; + private long cachedValueSum; + + AbstractBuckets() { + reset(); + } + + /** + * @return the array index of the first bucket of this set of buckets within {@link #bucketCounts} and {@link #bucketIndices}. + */ + abstract int startSlot(); + + final void reset() { + numBuckets = 0; + cachedValueSumForNumBuckets = 0; + cachedValueSum = 0; + } + + boolean tryAddBucket(long index, long count) { + int slot = startSlot() + numBuckets; + if (slot >= bucketCounts.length) { + return false; // no more space + } + bucketIndices[slot] = index; + bucketCounts[slot] = count; + numBuckets++; + return true; + } + + @Override + public CopyableBucketIterator iterator() { + int start = startSlot(); + return new BucketArrayIterator(start, start + numBuckets); + } + + @Override + public OptionalLong maxBucketIndex() { + if (numBuckets == 0) { + return OptionalLong.empty(); + } else { + return OptionalLong.of(bucketIndices[startSlot() + numBuckets - 1]); + } + } + + @Override + public long valueCount() { + int startSlot = startSlot(); + while (cachedValueSumForNumBuckets < numBuckets) { + cachedValueSum += bucketCounts[startSlot + cachedValueSumForNumBuckets]; + cachedValueSumForNumBuckets++; + } + return cachedValueSum; + } + } + + private class BucketArrayIterator implements CopyableBucketIterator { + + int current; + final int limit; + + private BucketArrayIterator(int start, int limit) { + this.current = start; + this.limit = limit; + } + + @Override + public boolean hasNext() { + return current < limit; + } + + @Override + public long peekCount() { + ensureEndNotReached(); + return bucketCounts[current]; + } + + @Override + public long peekIndex() { + ensureEndNotReached(); + return bucketIndices[current]; + } + + @Override + public void advance() { + ensureEndNotReached(); + current++; + } + + @Override + public int scale() { + return FixedCapacityExponentialHistogram.this.scale(); + } + + @Override + public CopyableBucketIterator copy() { + return new BucketArrayIterator(current, limit); + } + + private void ensureEndNotReached() { + if (hasNext() == false) { + throw new IllegalStateException("Iterator has no more buckets"); + } + } + } +} diff --git a/libs/exponential-histogram/src/main/java/org/elasticsearch/exponentialhistogram/MergingBucketIterator.java b/libs/exponential-histogram/src/main/java/org/elasticsearch/exponentialhistogram/MergingBucketIterator.java new file mode 100644 index 0000000000000..1ca660f62e879 --- /dev/null +++ b/libs/exponential-histogram/src/main/java/org/elasticsearch/exponentialhistogram/MergingBucketIterator.java @@ -0,0 +1,97 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.exponentialhistogram; + +/** + * An iterator that merges two bucket iterators, aligning them to a common scale and combining buckets with the same index. + */ +final class MergingBucketIterator implements BucketIterator { + + private final BucketIterator itA; + private final BucketIterator itB; + + private boolean endReached; + private long currentIndex; + private long currentCount; + + /** + * Creates a new merging iterator. + * + * @param itA the first iterator to merge + * @param itB the second iterator to merge + * @param targetScale the histogram scale to which both iterators should be aligned + */ + MergingBucketIterator(BucketIterator itA, BucketIterator itB, int targetScale) { + this.itA = new ScaleAdjustingBucketIterator(itA, targetScale); + this.itB = new ScaleAdjustingBucketIterator(itB, targetScale); + endReached = false; + advance(); + } + + @Override + public void advance() { + boolean hasNextA = itA.hasNext(); + boolean hasNextB = itB.hasNext(); + endReached = hasNextA == false && hasNextB == false; + if (endReached) { + return; + } + long idxA = 0; + long idxB = 0; + if (hasNextA) { + idxA = itA.peekIndex(); + } + if (hasNextB) { + idxB = itB.peekIndex(); + } + + currentCount = 0; + boolean advanceA = hasNextA && (hasNextB == false || idxA <= idxB); + boolean advanceB = hasNextB && (hasNextA == false || idxB <= idxA); + if (advanceA) { + currentIndex = idxA; + currentCount += itA.peekCount(); + itA.advance(); + } + if (advanceB) { + currentIndex = idxB; + currentCount += itB.peekCount(); + itB.advance(); + } + } + + @Override + public boolean hasNext() { + return endReached == false; + } + + @Override + public long peekCount() { + assertEndNotReached(); + return currentCount; + } + + @Override + public long peekIndex() { + assertEndNotReached(); + return currentIndex; + } + + @Override + public int scale() { + return itA.scale(); + } + + private void assertEndNotReached() { + if (endReached) { + throw new IllegalStateException("Iterator has no more buckets"); + } + } +} diff --git a/libs/exponential-histogram/src/main/java/org/elasticsearch/exponentialhistogram/ScaleAdjustingBucketIterator.java b/libs/exponential-histogram/src/main/java/org/elasticsearch/exponentialhistogram/ScaleAdjustingBucketIterator.java new file mode 100644 index 0000000000000..54b4d2cb2b467 --- /dev/null +++ b/libs/exponential-histogram/src/main/java/org/elasticsearch/exponentialhistogram/ScaleAdjustingBucketIterator.java @@ -0,0 +1,83 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.exponentialhistogram; + +import static org.elasticsearch.exponentialhistogram.ExponentialScaleUtils.adjustScale; + +/** + * An iterator that wraps another bucket iterator and adjusts its scale. + * When scaling down, multiple buckets can collapse into a single one. This iterator ensures they are merged correctly. + */ +final class ScaleAdjustingBucketIterator implements BucketIterator { + + private final BucketIterator delegate; + private final int scaleAdjustment; + + private long currentIndex; + private long currentCount; + boolean hasNextValue; + + /** + * Creates a new scale-adjusting iterator. + * + * @param delegate the iterator to wrap + * @param targetScale the target scale for the new iterator + */ + ScaleAdjustingBucketIterator(BucketIterator delegate, int targetScale) { + this.delegate = delegate; + scaleAdjustment = targetScale - delegate.scale(); + hasNextValue = true; + advance(); + } + + @Override + public boolean hasNext() { + return hasNextValue; + } + + @Override + public long peekCount() { + assertEndNotReached(); + return currentCount; + } + + @Override + public long peekIndex() { + assertEndNotReached(); + return currentIndex; + } + + @Override + public void advance() { + assertEndNotReached(); + hasNextValue = delegate.hasNext(); + if (hasNextValue == false) { + return; + } + currentIndex = adjustScale(delegate.peekIndex(), delegate.scale(), scaleAdjustment); + currentCount = delegate.peekCount(); + delegate.advance(); + while (delegate.hasNext() && adjustScale(delegate.peekIndex(), delegate.scale(), scaleAdjustment) == currentIndex) { + currentCount += delegate.peekCount(); + delegate.advance(); + } + } + + private void assertEndNotReached() { + if (hasNextValue == false) { + throw new IllegalStateException("Iterator has no more buckets"); + } + } + + @Override + public int scale() { + return delegate.scale() + scaleAdjustment; + } +} diff --git a/libs/exponential-histogram/src/main/java/org/elasticsearch/exponentialhistogram/ZeroBucket.java b/libs/exponential-histogram/src/main/java/org/elasticsearch/exponentialhistogram/ZeroBucket.java new file mode 100644 index 0000000000000..ef68b7599731e --- /dev/null +++ b/libs/exponential-histogram/src/main/java/org/elasticsearch/exponentialhistogram/ZeroBucket.java @@ -0,0 +1,154 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.exponentialhistogram; + +import static org.elasticsearch.exponentialhistogram.ExponentialHistogram.MAX_SCALE; +import static org.elasticsearch.exponentialhistogram.ExponentialHistogram.MIN_INDEX; +import static org.elasticsearch.exponentialhistogram.ExponentialHistogram.MIN_SCALE; +import static org.elasticsearch.exponentialhistogram.ExponentialScaleUtils.compareExponentiallyScaledValues; +import static org.elasticsearch.exponentialhistogram.ExponentialScaleUtils.computeIndex; +import static org.elasticsearch.exponentialhistogram.ExponentialScaleUtils.exponentiallyScaledToDoubleValue; + +/** + * Represents the bucket for values around zero in an exponential histogram. + * The range of this bucket is {@code [-zeroThreshold, +zeroThreshold]}. + * + * @param index The index used with the scale to determine the zero threshold. + * @param scale The scale used with the index to determine the zero threshold. + * @param count The number of values in the zero bucket. + */ +public record ZeroBucket(long index, int scale, long count) { + + // A singleton for an empty zero bucket with the smallest possible threshold. + private static final ZeroBucket MINIMAL_EMPTY = new ZeroBucket(MIN_INDEX, MIN_SCALE, 0); + + /** + * Creates a new zero bucket with a specific threshold and count. + * + * @param zeroThreshold The threshold defining the bucket's range [-zeroThreshold, +zeroThreshold]. + * @param count The number of values in the bucket. + */ + public ZeroBucket(double zeroThreshold, long count) { + this(computeIndex(zeroThreshold, MAX_SCALE) + 1, MAX_SCALE, count); + } + + /** + * @return A singleton instance of an empty zero bucket with the smallest possible threshold. + */ + public static ZeroBucket minimalEmpty() { + return MINIMAL_EMPTY; + } + + /** + * Creates a zero bucket with the smallest possible threshold and a given count. + * + * @param count The number of values in the bucket. + * @return A new {@link ZeroBucket}. + */ + public static ZeroBucket minimalWithCount(long count) { + if (count == 0) { + return MINIMAL_EMPTY; + } else { + return new ZeroBucket(MINIMAL_EMPTY.index, MINIMAL_EMPTY.scale(), count); + } + } + + /** + * Merges this zero bucket with another one. + *
    + *
  • If the other zero bucket is empty, this instance is returned unchanged.
  • + *
  • Otherwise, the zero threshold is increased if necessary (by taking the maximum of the two), and the counts are summed.
  • + *
+ * + * @param other The other zero bucket to merge with. + * @return A new {@link ZeroBucket} representing the merged result. + */ + public ZeroBucket merge(ZeroBucket other) { + if (other.count == 0) { + return this; + } else { + long totalCount = count + other.count; + // Both are populated, so we need to use the higher zero-threshold. + if (this.compareZeroThreshold(other) >= 0) { + return new ZeroBucket(index, scale, totalCount); + } else { + return new ZeroBucket(other.index, other.scale, totalCount); + } + } + } + + /** + * Collapses all buckets from the given iterators whose lower boundaries are smaller than the zero threshold. + * The iterators are advanced to point at the first, non-collapsed bucket. + * + * @param bucketIterators The iterators whose buckets may be collapsed. + * @return A potentially updated {@link ZeroBucket} with the collapsed buckets' counts and an adjusted threshold. + */ + public ZeroBucket collapseOverlappingBuckets(BucketIterator... bucketIterators) { + ZeroBucket current = this; + ZeroBucket previous; + do { + previous = current; + for (BucketIterator buckets : bucketIterators) { + current = current.collapseOverlappingBuckets(buckets); + } + } while (previous.compareZeroThreshold(current) != 0); + return current; + } + + /** + * Compares the zero threshold of this bucket with another one. + * + * @param other The other zero bucket to compare against. + * @return A negative integer, zero, or a positive integer if this bucket's threshold is less than, + * equal to, or greater than the other's. + */ + public int compareZeroThreshold(ZeroBucket other) { + return compareExponentiallyScaledValues(index, scale, other.index, other.scale); + } + + /** + * @return The value of the zero threshold. + */ + public double zeroThreshold() { + return exponentiallyScaledToDoubleValue(index, scale); + } + + /** + * Collapses all buckets from the given iterator whose lower boundaries are smaller than the zero threshold. + * The iterator is advanced to point at the first, non-collapsed bucket. + * + * @param buckets The iterator whose buckets may be collapsed. + * @return A potentially updated {@link ZeroBucket} with the collapsed buckets' counts and an adjusted threshold. + */ + public ZeroBucket collapseOverlappingBuckets(BucketIterator buckets) { + + long collapsedCount = 0; + long highestCollapsedIndex = 0; + while (buckets.hasNext() && compareExponentiallyScaledValues(buckets.peekIndex(), buckets.scale(), index, scale) < 0) { + highestCollapsedIndex = buckets.peekIndex(); + collapsedCount += buckets.peekCount(); + buckets.advance(); + } + if (collapsedCount == 0) { + return this; + } else { + long newZeroCount = count + collapsedCount; + // +1 because we need to adjust the zero threshold to the upper boundary of the collapsed bucket + long collapsedUpperBoundIndex = highestCollapsedIndex + 1; + if (compareExponentiallyScaledValues(index, scale, collapsedUpperBoundIndex, buckets.scale()) >= 0) { + // Our current zero-threshold is larger than the upper boundary of the largest collapsed bucket, so we keep it. + return new ZeroBucket(index, scale, newZeroCount); + } else { + return new ZeroBucket(collapsedUpperBoundIndex, buckets.scale(), newZeroCount); + } + } + } +} diff --git a/libs/exponential-histogram/src/test/java/org/elasticsearch/exponentialhistogram/DownscaleStatsTests.java b/libs/exponential-histogram/src/test/java/org/elasticsearch/exponentialhistogram/DownscaleStatsTests.java new file mode 100644 index 0000000000000..86a32388c8f1b --- /dev/null +++ b/libs/exponential-histogram/src/test/java/org/elasticsearch/exponentialhistogram/DownscaleStatsTests.java @@ -0,0 +1,76 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.exponentialhistogram; + +import org.elasticsearch.test.ESTestCase; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.stream.IntStream; +import java.util.stream.LongStream; + +import static org.elasticsearch.exponentialhistogram.ExponentialHistogram.MAX_INDEX; +import static org.elasticsearch.exponentialhistogram.ExponentialHistogram.MAX_INDEX_BITS; +import static org.elasticsearch.exponentialhistogram.ExponentialHistogram.MIN_INDEX; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + +public class DownscaleStatsTests extends ESTestCase { + + public void testExponential() { + long[] values = IntStream.range(0, 100).mapToLong(i -> (long) Math.min(MAX_INDEX, Math.pow(1.1, i))).distinct().toArray(); + verifyFor(values); + } + + public void testNumericalLimits() { + verifyFor(MIN_INDEX, MAX_INDEX); + } + + public void testRandom() { + for (int i = 0; i < 100; i++) { + List values = IntStream.range(0, 1000).mapToObj(j -> randomLongBetween(MIN_INDEX, MAX_INDEX)).distinct().toList(); + verifyFor(values); + } + } + + void verifyFor(long... indices) { + verifyFor(LongStream.of(indices).boxed().toList()); + } + + void verifyFor(Collection indices) { + // sanity check, we require unique indices + assertThat(indices.size(), equalTo(new HashSet<>(indices).size())); + + List sorted = new ArrayList<>(indices); + sorted.sort(Long::compareTo); + + DownscaleStats stats = new DownscaleStats(); + for (int i = 1; i < sorted.size(); i++) { + long prev = sorted.get(i - 1); + long curr = sorted.get(i); + stats.add(prev, curr); + } + + for (int i = 0; i <= MAX_INDEX_BITS; i++) { + int scaleReduction = i; + long remainingCount = indices.stream().mapToLong(Long::longValue).map(index -> index >> scaleReduction).distinct().count(); + long reduction = sorted.size() - remainingCount; + + assertThat( + "Expected size after reduction of " + i + " to match", + stats.getCollapsedBucketCountAfterScaleReduction(scaleReduction), + equalTo((int) reduction) + ); + } + + } +} diff --git a/libs/exponential-histogram/src/test/java/org/elasticsearch/exponentialhistogram/ExponentialHistogramGeneratorTests.java b/libs/exponential-histogram/src/test/java/org/elasticsearch/exponentialhistogram/ExponentialHistogramGeneratorTests.java new file mode 100644 index 0000000000000..23ace7e861093 --- /dev/null +++ b/libs/exponential-histogram/src/test/java/org/elasticsearch/exponentialhistogram/ExponentialHistogramGeneratorTests.java @@ -0,0 +1,33 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.exponentialhistogram; + +import org.elasticsearch.test.ESTestCase; + +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; + +public class ExponentialHistogramGeneratorTests extends ESTestCase { + + public void testVeryLargeValue() { + double value = Double.MAX_VALUE / 10; + ExponentialHistogram histo = ExponentialHistogramGenerator.createFor(value); + + long index = histo.positiveBuckets().iterator().peekIndex(); + int scale = histo.scale(); + + double lowerBound = ExponentialScaleUtils.getLowerBucketBoundary(index, scale); + double upperBound = ExponentialScaleUtils.getUpperBucketBoundary(index, scale); + + assertThat("Lower bucket boundary should be smaller than value", lowerBound, lessThanOrEqualTo(value)); + assertThat("Upper bucket boundary should be greater than value", upperBound, greaterThanOrEqualTo(value)); + } + +} diff --git a/libs/exponential-histogram/src/test/java/org/elasticsearch/exponentialhistogram/ExponentialHistogramMergerTests.java b/libs/exponential-histogram/src/test/java/org/elasticsearch/exponentialhistogram/ExponentialHistogramMergerTests.java new file mode 100644 index 0000000000000..89bb3b39a6746 --- /dev/null +++ b/libs/exponential-histogram/src/test/java/org/elasticsearch/exponentialhistogram/ExponentialHistogramMergerTests.java @@ -0,0 +1,167 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.exponentialhistogram; + +import org.elasticsearch.test.ESTestCase; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.elasticsearch.exponentialhistogram.ExponentialHistogram.MAX_INDEX; +import static org.elasticsearch.exponentialhistogram.ExponentialHistogram.MIN_INDEX; +import static org.elasticsearch.exponentialhistogram.ExponentialScaleUtils.adjustScale; +import static org.hamcrest.Matchers.closeTo; +import static org.hamcrest.Matchers.equalTo; + +public class ExponentialHistogramMergerTests extends ESTestCase { + + public void testZeroThresholdCollapsesOverlappingBuckets() { + FixedCapacityExponentialHistogram first = new FixedCapacityExponentialHistogram(100); + first.setZeroBucket(new ZeroBucket(2.0001, 10)); + + FixedCapacityExponentialHistogram second = new FixedCapacityExponentialHistogram(100); + first.resetBuckets(0); // scale 0 means base 2 + first.tryAddBucket(0, 1, false); // bucket (-2, 1] + first.tryAddBucket(1, 1, false); // bucket (-4, 2] + first.tryAddBucket(2, 7, false); // bucket (-8, 4] + first.tryAddBucket(0, 1, true); // bucket (1, 2] + first.tryAddBucket(1, 1, true); // bucket (2, 4] + first.tryAddBucket(2, 42, true); // bucket (4, 8] + + ExponentialHistogram mergeResult = mergeWithMinimumScale(100, 0, first, second); + + assertThat(mergeResult.zeroBucket().zeroThreshold(), equalTo(4.0)); + assertThat(mergeResult.zeroBucket().count(), equalTo(14L)); + + // only the (4, 8] bucket should be left + assertThat(mergeResult.scale(), equalTo(0)); + + BucketIterator negBuckets = mergeResult.negativeBuckets().iterator(); + assertThat(negBuckets.peekIndex(), equalTo(2L)); + assertThat(negBuckets.peekCount(), equalTo(7L)); + negBuckets.advance(); + assertThat(negBuckets.hasNext(), equalTo(false)); + + BucketIterator posBuckets = mergeResult.positiveBuckets().iterator(); + assertThat(posBuckets.peekIndex(), equalTo(2L)); + assertThat(posBuckets.peekCount(), equalTo(42L)); + posBuckets.advance(); + assertThat(posBuckets.hasNext(), equalTo(false)); + + // ensure buckets of the accumulated histogram are collapsed too if needed + FixedCapacityExponentialHistogram third = new FixedCapacityExponentialHistogram(100); + third.setZeroBucket(new ZeroBucket(45.0, 1)); + + mergeResult = mergeWithMinimumScale(100, 0, mergeResult, third); + assertThat(mergeResult.zeroBucket().zeroThreshold(), closeTo(45.0, 0.000001)); + assertThat(mergeResult.zeroBucket().count(), equalTo(1L + 14L + 42L + 7L)); + assertThat(mergeResult.positiveBuckets().iterator().hasNext(), equalTo(false)); + assertThat(mergeResult.negativeBuckets().iterator().hasNext(), equalTo(false)); + } + + public void testEmptyZeroBucketIgnored() { + FixedCapacityExponentialHistogram first = new FixedCapacityExponentialHistogram(100); + first.setZeroBucket(new ZeroBucket(2.0, 10)); + first.resetBuckets(0); // scale 0 means base 2 + first.tryAddBucket(2, 42L, true); // bucket (4, 8] + + FixedCapacityExponentialHistogram second = new FixedCapacityExponentialHistogram(100); + second.setZeroBucket(new ZeroBucket(100.0, 0)); + + ExponentialHistogram mergeResult = mergeWithMinimumScale(100, 0, first, second); + + assertThat(mergeResult.zeroBucket().zeroThreshold(), equalTo(2.0)); + assertThat(mergeResult.zeroBucket().count(), equalTo(10L)); + + BucketIterator posBuckets = mergeResult.positiveBuckets().iterator(); + assertThat(posBuckets.peekIndex(), equalTo(2L)); + assertThat(posBuckets.peekCount(), equalTo(42L)); + posBuckets.advance(); + assertThat(posBuckets.hasNext(), equalTo(false)); + } + + public void testUpscalingDoesNotExceedIndexLimits() { + for (int i = 0; i < 4; i++) { + + boolean isPositive = i % 2 == 0; + boolean useMinIndex = i > 1; + + FixedCapacityExponentialHistogram histo = new FixedCapacityExponentialHistogram(2); + histo.resetBuckets(20); + + long index = useMinIndex ? MIN_INDEX / 2 : MAX_INDEX / 2; + + histo.tryAddBucket(index, 1, isPositive); + + ExponentialHistogramMerger merger = new ExponentialHistogramMerger(100); + merger.add(histo); + ExponentialHistogram result = merger.get(); + + assertThat(result.scale(), equalTo(21)); + if (isPositive) { + assertThat(result.positiveBuckets().iterator().peekIndex(), equalTo(adjustScale(index, 20, 1))); + } else { + assertThat(result.negativeBuckets().iterator().peekIndex(), equalTo(adjustScale(index, 20, 1))); + } + } + } + + /** + * Verify that the resulting histogram is independent of the order of elements and therefore merges performed. + */ + public void testMergeOrderIndependence() { + List values = IntStream.range(0, 10_000) + .mapToDouble(i -> i < 17 ? 0 : (-1 + 2 * randomDouble()) * Math.pow(10, randomIntBetween(-4, 4))) + .boxed() + .collect(Collectors.toCollection(ArrayList::new)); + + ExponentialHistogram reference = ExponentialHistogramGenerator.createFor( + 20, + values.stream().mapToDouble(Double::doubleValue).toArray() + ); + + for (int i = 0; i < 100; i++) { + Collections.shuffle(values, random()); + ExponentialHistogram shuffled = ExponentialHistogramGenerator.createFor( + 20, + values.stream().mapToDouble(Double::doubleValue).toArray() + ); + + assertThat("Expected same scale", shuffled.scale(), equalTo(reference.scale())); + assertThat("Expected same zero-bucket", shuffled.zeroBucket(), equalTo(reference.zeroBucket())); + assertBucketsEqual(shuffled.negativeBuckets(), reference.negativeBuckets()); + assertBucketsEqual(shuffled.positiveBuckets(), reference.positiveBuckets()); + } + } + + private void assertBucketsEqual(ExponentialHistogram.Buckets bucketsA, ExponentialHistogram.Buckets bucketsB) { + BucketIterator itA = bucketsA.iterator(); + BucketIterator itB = bucketsB.iterator(); + assertThat("Expecting both set of buckets to be empty or non-empty", itA.hasNext(), equalTo(itB.hasNext())); + while (itA.hasNext() && itB.hasNext()) { + assertThat(itA.peekIndex(), equalTo(itB.peekIndex())); + assertThat(itA.peekCount(), equalTo(itB.peekCount())); + assertThat("The number of buckets is different", itA.hasNext(), equalTo(itB.hasNext())); + itA.advance(); + itB.advance(); + } + } + + private static ExponentialHistogram mergeWithMinimumScale(int bucketCount, int scale, ExponentialHistogram... histograms) { + ExponentialHistogramMerger merger = ExponentialHistogramMerger.createForTesting(bucketCount, scale); + Arrays.stream(histograms).forEach(merger::add); + return merger.get(); + } + +} diff --git a/libs/exponential-histogram/src/test/java/org/elasticsearch/exponentialhistogram/ExponentialScaleUtilsTests.java b/libs/exponential-histogram/src/test/java/org/elasticsearch/exponentialhistogram/ExponentialScaleUtilsTests.java new file mode 100644 index 0000000000000..c84553dd6d825 --- /dev/null +++ b/libs/exponential-histogram/src/test/java/org/elasticsearch/exponentialhistogram/ExponentialScaleUtilsTests.java @@ -0,0 +1,199 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.exponentialhistogram; + +import ch.obermuhlner.math.big.BigDecimalMath; + +import org.elasticsearch.test.ESTestCase; + +import java.math.BigDecimal; +import java.math.MathContext; +import java.math.RoundingMode; + +import static org.elasticsearch.exponentialhistogram.ExponentialHistogram.MAX_INDEX; +import static org.elasticsearch.exponentialhistogram.ExponentialHistogram.MAX_INDEX_BITS; +import static org.elasticsearch.exponentialhistogram.ExponentialHistogram.MAX_SCALE; +import static org.elasticsearch.exponentialhistogram.ExponentialHistogram.MIN_INDEX; +import static org.elasticsearch.exponentialhistogram.ExponentialHistogram.MIN_SCALE; +import static org.elasticsearch.exponentialhistogram.ExponentialScaleUtils.SCALE_UP_CONSTANT_TABLE; +import static org.elasticsearch.exponentialhistogram.ExponentialScaleUtils.adjustScale; +import static org.elasticsearch.exponentialhistogram.ExponentialScaleUtils.compareExponentiallyScaledValues; +import static org.elasticsearch.exponentialhistogram.ExponentialScaleUtils.computeIndex; +import static org.elasticsearch.exponentialhistogram.ExponentialScaleUtils.getLowerBucketBoundary; +import static org.elasticsearch.exponentialhistogram.ExponentialScaleUtils.getMaximumScaleIncrease; +import static org.elasticsearch.exponentialhistogram.ExponentialScaleUtils.getPointOfLeastRelativeError; +import static org.elasticsearch.exponentialhistogram.ExponentialScaleUtils.getUpperBucketBoundary; +import static org.hamcrest.Matchers.closeTo; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThan; +import static org.hamcrest.Matchers.lessThanOrEqualTo; + +public class ExponentialScaleUtilsTests extends ESTestCase { + + public void testMaxIndex() { + assertThat(getMaximumScaleIncrease(MAX_INDEX), equalTo(0)); + assertThat(getMaximumScaleIncrease(MAX_INDEX - 1), equalTo(0)); + assertThat(getMaximumScaleIncrease(MAX_INDEX >> 1), equalTo(1)); + assertThrows(ArithmeticException.class, () -> Math.multiplyExact(MAX_INDEX, 4)); + } + + public void testMinIndex() { + assertThat(getMaximumScaleIncrease(MIN_INDEX), equalTo(0)); + assertThat(getMaximumScaleIncrease(MIN_INDEX + 1), equalTo(0)); + assertThat(getMaximumScaleIncrease(MIN_INDEX >> 1), equalTo(0)); + assertThat(getMaximumScaleIncrease((MIN_INDEX + 1) >> 1), equalTo(1)); + assertThrows(ArithmeticException.class, () -> Math.multiplyExact(MIN_INDEX, 4)); + } + + public void testExtremeValueIndexing() { + double leeway = Math.pow(10.0, 20); + + for (double testValue : new double[] { Double.MAX_VALUE / leeway, Double.MIN_VALUE * leeway }) { + long idx = computeIndex(testValue, MAX_SCALE); + double lowerBound = getLowerBucketBoundary(idx, MAX_SCALE); + double upperBound = getUpperBucketBoundary(idx, MAX_SCALE); + assertThat(lowerBound, lessThanOrEqualTo(testValue)); + assertThat(upperBound, greaterThanOrEqualTo(testValue)); + assertThat(lowerBound, lessThan(upperBound)); + } + } + + public void testRandomValueIndexing() { + for (int i = 0; i < 100_000; i++) { + // generate values in the range 10^-100 to 10^100 + double exponent = randomDouble() * 200 - 100; + double testValue = Math.pow(10, exponent); + int scale = randomIntBetween(MIN_SCALE / 2, MAX_SCALE / 2); + long index = computeIndex(testValue, scale); + + double lowerBound = getLowerBucketBoundary(index, scale); + double upperBound = getUpperBucketBoundary(index, scale); + double pointOfLeastError = getPointOfLeastRelativeError(index, scale); + + String baseMsg = " for input value " + testValue + " and scale " + scale; + + assertThat("Expected lower bound to be less than input value", lowerBound, lessThanOrEqualTo(testValue)); + assertThat("Expected upper bound to be greater than input value", upperBound, greaterThanOrEqualTo(upperBound)); + assertThat("Expected lower bound to be less than upper bound" + baseMsg, lowerBound, lessThan(upperBound)); + + // only do this check for ranges where we have enough numeric stability + if (lowerBound > Math.pow(10, -250) && upperBound < Math.pow(10, 250)) { + + assertThat( + "Expected point of least error to be greater than lower bound" + baseMsg, + pointOfLeastError, + greaterThan(lowerBound) + ); + assertThat("Expected point of least error to be less than upper bound" + baseMsg, pointOfLeastError, lessThan(upperBound)); + + double errorLower = (pointOfLeastError - lowerBound) / lowerBound; + double errorUpper = (upperBound - pointOfLeastError) / upperBound; + assertThat(errorLower / errorUpper, closeTo(1, 0.1)); + } + + } + } + + public void testRandomIndicesScaleAdjustement() { + + for (int i = 0; i < 100_000; i++) { + long index = randomLongBetween(MIN_INDEX, MAX_INDEX); + int currentScale = randomIntBetween(MIN_SCALE, MAX_SCALE); + int maxAdjustment = Math.min(MAX_SCALE - currentScale, getMaximumScaleIncrease(index)); + + assertThat( + adjustScale(adjustScale(index, currentScale, maxAdjustment), currentScale + maxAdjustment, -maxAdjustment), + equalTo(index) + ); + if (currentScale + maxAdjustment < MAX_SCALE) { + if (index > 0) { + assertThat(adjustScale(index, currentScale, maxAdjustment) * 2, greaterThan(MAX_INDEX)); + } else if (index < 0) { + assertThat(adjustScale(index, currentScale, maxAdjustment) * 2, lessThan(MIN_INDEX)); + } + } + } + + } + + public void testRandomBucketBoundaryComparison() { + + for (int i = 0; i < 100_000; i++) { + long indexA = randomLongBetween(MIN_INDEX, MAX_INDEX); + long indexB = randomLongBetween(MIN_INDEX, MAX_INDEX); + int scaleA = randomIntBetween(MIN_SCALE, MAX_SCALE); + int scaleB = randomIntBetween(MIN_SCALE, MAX_SCALE); + + double lowerBoundA = getLowerBucketBoundary(indexA, scaleA); + while (Double.isInfinite(lowerBoundA)) { + indexA = indexA >> 1; + lowerBoundA = getLowerBucketBoundary(indexA, scaleA); + } + double lowerBoundB = getLowerBucketBoundary(indexB, scaleB); + while (Double.isInfinite(lowerBoundB)) { + indexB = indexB >> 1; + lowerBoundB = getLowerBucketBoundary(indexB, scaleB); + } + + if (lowerBoundA != lowerBoundB) { + assertThat( + Double.compare(lowerBoundA, lowerBoundB), + equalTo(compareExponentiallyScaledValues(indexA, scaleA, indexB, scaleB)) + ); + } + } + } + + public void testUpscalingAccuracy() { + // Use slightly adjusted scales to not run into numeric trouble, because we don't use exact maths here + int minScale = MIN_SCALE + 7; + int maxScale = MAX_SCALE - 15; + + for (int i = 0; i < 10_000; i++) { + + int startScale = randomIntBetween(minScale, maxScale - 1); + int scaleIncrease = randomIntBetween(1, maxScale - startScale); + + long index = MAX_INDEX >> scaleIncrease >> (int) (randomDouble() * (MAX_INDEX_BITS - scaleIncrease)); + index = Math.max(1, index); + index = (long) ((2 * randomDouble() - 1) * index); + + double midPoint = getPointOfLeastRelativeError(index, startScale); + // limit the numeric range, otherwise we get rounding errors causing the test to fail + while (midPoint > Math.pow(10, 10) || midPoint < Math.pow(10, -10)) { + index /= 2; + midPoint = getPointOfLeastRelativeError(index, startScale); + } + + long scaledUpIndex = adjustScale(index, startScale, scaleIncrease); + long correctIdx = computeIndex(midPoint, startScale + scaleIncrease); + // Due to rounding problems in the tests, we can still be off by one for extreme scales + assertThat(scaledUpIndex, equalTo(correctIdx)); + } + } + + public void testScaleUpTableUpToDate() { + + MathContext mc = new MathContext(1000); + BigDecimal one = new BigDecimal(1, mc); + BigDecimal two = new BigDecimal(2, mc); + + for (int scale = MIN_SCALE; scale <= MAX_SCALE; scale++) { + BigDecimal base = BigDecimalMath.pow(two, two.pow(-scale, mc), mc); + BigDecimal factor = one.add(two.pow(scale, mc).multiply(one.subtract(BigDecimalMath.log2(one.add(base), mc)))); + + BigDecimal scaledFactor = factor.multiply(two.pow(63, mc)).setScale(0, RoundingMode.FLOOR); + assertThat(SCALE_UP_CONSTANT_TABLE[scale - MIN_SCALE], equalTo(scaledFactor.longValue())); + } + } + +} diff --git a/libs/exponential-histogram/src/test/java/org/elasticsearch/exponentialhistogram/FixedCapacityExponentialHistogramTests.java b/libs/exponential-histogram/src/test/java/org/elasticsearch/exponentialhistogram/FixedCapacityExponentialHistogramTests.java new file mode 100644 index 0000000000000..b3a698b2ccd1a --- /dev/null +++ b/libs/exponential-histogram/src/test/java/org/elasticsearch/exponentialhistogram/FixedCapacityExponentialHistogramTests.java @@ -0,0 +1,48 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.exponentialhistogram; + +import org.elasticsearch.test.ESTestCase; + +import static org.hamcrest.Matchers.equalTo; + +public class FixedCapacityExponentialHistogramTests extends ESTestCase { + + public void testValueCountUpdatedCorrectly() { + + FixedCapacityExponentialHistogram histogram = new FixedCapacityExponentialHistogram(100); + + assertThat(histogram.negativeBuckets().valueCount(), equalTo(0L)); + assertThat(histogram.positiveBuckets().valueCount(), equalTo(0L)); + + histogram.tryAddBucket(1, 10, false); + + assertThat(histogram.negativeBuckets().valueCount(), equalTo(10L)); + assertThat(histogram.positiveBuckets().valueCount(), equalTo(0L)); + + histogram.tryAddBucket(2, 3, false); + histogram.tryAddBucket(3, 4, false); + histogram.tryAddBucket(1, 5, true); + + assertThat(histogram.negativeBuckets().valueCount(), equalTo(17L)); + assertThat(histogram.positiveBuckets().valueCount(), equalTo(5L)); + + histogram.tryAddBucket(2, 3, true); + histogram.tryAddBucket(3, 4, true); + + assertThat(histogram.negativeBuckets().valueCount(), equalTo(17L)); + assertThat(histogram.positiveBuckets().valueCount(), equalTo(12L)); + + histogram.resetBuckets(0); + + assertThat(histogram.negativeBuckets().valueCount(), equalTo(0L)); + assertThat(histogram.positiveBuckets().valueCount(), equalTo(0L)); + } +} diff --git a/libs/exponential-histogram/src/test/java/org/elasticsearch/exponentialhistogram/QuantileAccuracyTests.java b/libs/exponential-histogram/src/test/java/org/elasticsearch/exponentialhistogram/QuantileAccuracyTests.java new file mode 100644 index 0000000000000..a1bbaf174de76 --- /dev/null +++ b/libs/exponential-histogram/src/test/java/org/elasticsearch/exponentialhistogram/QuantileAccuracyTests.java @@ -0,0 +1,288 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.exponentialhistogram; + +import org.apache.commons.math3.distribution.BetaDistribution; +import org.apache.commons.math3.distribution.ExponentialDistribution; +import org.apache.commons.math3.distribution.GammaDistribution; +import org.apache.commons.math3.distribution.LogNormalDistribution; +import org.apache.commons.math3.distribution.NormalDistribution; +import org.apache.commons.math3.distribution.RealDistribution; +import org.apache.commons.math3.distribution.UniformRealDistribution; +import org.apache.commons.math3.distribution.WeibullDistribution; +import org.apache.commons.math3.random.Well19937c; +import org.elasticsearch.test.ESTestCase; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Locale; +import java.util.stream.DoubleStream; +import java.util.stream.IntStream; + +import static org.elasticsearch.exponentialhistogram.ExponentialHistogram.MAX_SCALE; +import static org.elasticsearch.exponentialhistogram.ExponentialScaleUtils.computeIndex; +import static org.hamcrest.Matchers.closeTo; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.lessThan; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.notANumber; + +public class QuantileAccuracyTests extends ESTestCase { + + public static final double[] QUANTILES_TO_TEST = { 0, 0.0000001, 0.01, 0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.99, 0.999999, 1.0 }; + + private static int randomBucketCount() { + // exponentially distribute the bucket count to test more for smaller sizes + return (int) Math.round(5 + Math.pow(1995, randomDouble())); + } + + public void testUniformDistribution() { + testDistributionQuantileAccuracy(new UniformRealDistribution(new Well19937c(randomInt()), 0, 100)); + } + + public void testNormalDistribution() { + testDistributionQuantileAccuracy(new NormalDistribution(new Well19937c(randomInt()), 100, 15)); + } + + public void testExponentialDistribution() { + testDistributionQuantileAccuracy(new ExponentialDistribution(new Well19937c(randomInt()), 10)); + } + + public void testLogNormalDistribution() { + testDistributionQuantileAccuracy(new LogNormalDistribution(new Well19937c(randomInt()), 0, 1)); + } + + public void testGammaDistribution() { + testDistributionQuantileAccuracy(new GammaDistribution(new Well19937c(randomInt()), 2, 5)); + } + + public void testBetaDistribution() { + testDistributionQuantileAccuracy(new BetaDistribution(new Well19937c(randomInt()), 2, 5)); + } + + public void testWeibullDistribution() { + testDistributionQuantileAccuracy(new WeibullDistribution(new Well19937c(randomInt()), 2, 5)); + } + + public void testBasicSmall() { + DoubleStream values = IntStream.range(1, 10).mapToDouble(Double::valueOf); + double maxError = testQuantileAccuracy(values.toArray(), 100); + assertThat(maxError, lessThan(0.000001)); + } + + public void testPercentileOverlapsZeroBucket() { + ExponentialHistogram histo = ExponentialHistogramGenerator.createFor(-3.0, -2, -1, 0, 0, 0, 1, 2, 3); + assertThat(ExponentialHistogramQuantile.getQuantile(histo, 8.0 / 16.0), equalTo(0.0)); + assertThat(ExponentialHistogramQuantile.getQuantile(histo, 7.0 / 16.0), equalTo(0.0)); + assertThat(ExponentialHistogramQuantile.getQuantile(histo, 9.0 / 16.0), equalTo(0.0)); + assertThat(ExponentialHistogramQuantile.getQuantile(histo, 5.0 / 16.0), closeTo(-0.5, 0.000001)); + assertThat(ExponentialHistogramQuantile.getQuantile(histo, 11.0 / 16.0), closeTo(0.5, 0.000001)); + } + + public void testBigJump() { + double[] values = DoubleStream.concat(IntStream.range(0, 18).mapToDouble(Double::valueOf), DoubleStream.of(1_000_000.0)).toArray(); + + double maxError = testQuantileAccuracy(values, 500); + assertThat(maxError, lessThan(0.000001)); + } + + public void testExplicitSkewedData() { + double[] data = new double[] { + 245, + 246, + 247.249, + 240, + 243, + 248, + 250, + 241, + 244, + 245, + 245, + 247, + 243, + 242, + 241, + 50100, + 51246, + 52247, + 52249, + 51240, + 53243, + 59248, + 59250, + 57241, + 56244, + 55245, + 56245, + 575247, + 58243, + 51242, + 54241 }; + + double maxError = testQuantileAccuracy(data, data.length / 2); + assertThat(maxError, lessThan(0.007)); + } + + public void testEmptyHistogram() { + ExponentialHistogram histo = ExponentialHistogramGenerator.createFor(); + for (double q : QUANTILES_TO_TEST) { + assertThat(ExponentialHistogramQuantile.getQuantile(histo, q), notANumber()); + } + } + + public void testSingleValueHistogram() { + ExponentialHistogram histo = ExponentialHistogramGenerator.createFor(42.0); + for (double q : QUANTILES_TO_TEST) { + assertThat(ExponentialHistogramQuantile.getQuantile(histo, q), closeTo(42, 0.0000001)); + } + } + + public void testBucketCountImpact() { + RealDistribution distribution = new LogNormalDistribution(new Well19937c(randomInt()), 0, 1); + int sampleSize = between(100, 50_000); + double[] values = generateSamples(distribution, sampleSize); + + // Verify that more buckets generally means better accuracy + double errorWithFewBuckets = testQuantileAccuracy(values, 20); + double errorWithManyBuckets = testQuantileAccuracy(values, 200); + assertThat("More buckets should improve accuracy", errorWithManyBuckets, lessThanOrEqualTo(errorWithFewBuckets)); + } + + public void testMixedSignValues() { + double[] values = new double[between(100, 10_000)]; + for (int i = 0; i < values.length; i++) { + values[i] = (randomDouble() * 200) - 100; // Range from -100 to 100 + } + + testQuantileAccuracy(values, 100); + } + + public void testSkewedData() { + // Create a highly skewed dataset + double[] values = new double[10000]; + for (int i = 0; i < values.length; i++) { + if (randomDouble() < 0.9) { + // 90% of values are small + values[i] = randomDouble() * 10; + } else { + // 10% are very large + values[i] = randomDouble() * 10000 + 100; + } + } + + testQuantileAccuracy(values, 100); + } + + public void testDataWithZeros() { + double[] values = new double[10000]; + for (int i = 0; i < values.length; i++) { + if (randomDouble() < 0.2) { + // 20% zeros + values[i] = 0; + } else { + values[i] = randomDouble() * 100; + } + } + + testQuantileAccuracy(values, 100); + } + + private void testDistributionQuantileAccuracy(RealDistribution distribution) { + double[] values = generateSamples(distribution, between(100, 50_000)); + int bucketCount = randomBucketCount(); + testQuantileAccuracy(values, bucketCount); + } + + private static double[] generateSamples(RealDistribution distribution, int sampleSize) { + double[] values = new double[sampleSize]; + for (int i = 0; i < sampleSize; i++) { + values[i] = distribution.sample(); + } + return values; + } + + private double testQuantileAccuracy(double[] values, int bucketCount) { + // Create histogram + ExponentialHistogram histogram = ExponentialHistogramGenerator.createFor(bucketCount, values); + Arrays.sort(values); + + double allowedError = getMaximumRelativeError(values, bucketCount); + double maxError = 0; + + // Compare histogram quantiles with exact quantiles + for (double q : QUANTILES_TO_TEST) { + double percentileRank = q * (values.length - 1); + int lowerRank = (int) Math.floor(percentileRank); + int upperRank = (int) Math.ceil(percentileRank); + double upperFactor = percentileRank - lowerRank; + + if (values[lowerRank] < 0 && values[upperRank] > 0) { + // the percentile lies directly between a sign change and we interpolate linearly in-between + // in this case the relative error bound does not hold + continue; + } + double exactValue = values[lowerRank] * (1 - upperFactor) + values[upperRank] * upperFactor; + + double histoValue = ExponentialHistogramQuantile.getQuantile(histogram, q); + + // Skip comparison if exact value is close to zero to avoid false-positives due to numerical imprecision + if (Math.abs(exactValue) < 1e-100) { + continue; + } + + double relativeError = Math.abs(histoValue - exactValue) / Math.abs(exactValue); + maxError = Math.max(maxError, relativeError); + + assertThat( + String.format(Locale.ENGLISH, "Quantile %.2f should be accurate within %.6f%% relative error", q, allowedError * 100), + histoValue, + closeTo(exactValue, Math.abs(exactValue * allowedError)) + ); + + } + return maxError; + } + + /** + * Provides the upper bound of the relative error for any percentile estimate performed with the exponential histogram. + * The error depends on the raw values put into the histogram and the number of buckets allowed. + * This is an implementation of the error bound computation proven by Theorem 3 in the UDDSketch paper + */ + private static double getMaximumRelativeError(double[] values, int bucketCount) { + HashSet usedPositiveIndices = new HashSet<>(); + HashSet usedNegativeIndices = new HashSet<>(); + int bestPossibleScale = MAX_SCALE; + for (double value : values) { + if (value < 0) { + usedPositiveIndices.add(computeIndex(value, bestPossibleScale)); + } else if (value > 0) { + usedNegativeIndices.add(computeIndex(value, bestPossibleScale)); + } + while ((usedNegativeIndices.size() + usedPositiveIndices.size()) > bucketCount) { + usedNegativeIndices = rightShiftAll(usedNegativeIndices); + usedPositiveIndices = rightShiftAll(usedPositiveIndices); + bestPossibleScale--; + } + } + // for the best possible scale, compute the worst-case error + double base = Math.pow(2.0, Math.scalb(1.0, -bestPossibleScale)); + return 2 * base / (1 + base) - 1; + } + + private static HashSet rightShiftAll(HashSet indices) { + HashSet result = new HashSet<>(); + for (long index : indices) { + result.add(index >> 1); + } + return result; + } + +} diff --git a/libs/exponential-histogram/src/test/java/org/elasticsearch/exponentialhistogram/ZeroBucketTests.java b/libs/exponential-histogram/src/test/java/org/elasticsearch/exponentialhistogram/ZeroBucketTests.java new file mode 100644 index 0000000000000..6fc4a1b6671a8 --- /dev/null +++ b/libs/exponential-histogram/src/test/java/org/elasticsearch/exponentialhistogram/ZeroBucketTests.java @@ -0,0 +1,21 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.exponentialhistogram; + +import org.elasticsearch.test.ESTestCase; + +import static org.hamcrest.Matchers.equalTo; + +public class ZeroBucketTests extends ESTestCase { + + public void testMinimalBucketHasZeroThreshold() { + assertThat(ZeroBucket.minimalWithCount(42).zeroThreshold(), equalTo(0.0)); + } +}