Skip to content

Commit 7d71255

Browse files
garaudnicenatalia
andcommitted
Adapt the plugin to Elastic 8
Thanks to @nicenatalia Co-authored-by: Natalia Escalera <[email protected]>
1 parent 513811e commit 7d71255

File tree

7 files changed

+305
-261
lines changed

7 files changed

+305
-261
lines changed

src/main/java/org/opendatasoft/elasticsearch/search/aggregations/bucket/DateHierarchyAggregationBuilder.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package org.opendatasoft.elasticsearch.search.aggregations.bucket;
22

3-
import org.elasticsearch.Version;
3+
import org.elasticsearch.TransportVersion;
4+
import org.elasticsearch.TransportVersions;
45
import org.elasticsearch.common.Rounding;
56
import org.elasticsearch.common.io.stream.StreamInput;
67
import org.elasticsearch.common.io.stream.StreamOutput;
@@ -40,6 +41,11 @@
4041
* The builder of the aggregatorFactory. Also implements the parsing of the request.
4142
*/
4243
public class DateHierarchyAggregationBuilder extends ValuesSourceAggregationBuilder<DateHierarchyAggregationBuilder> {
44+
@Override
45+
public TransportVersion getMinimalSupportedVersion() {
46+
return TransportVersions.V_8_0_0;
47+
}
48+
4349
public static final String NAME = "date_hierarchy";
4450
public static final ValuesSourceRegistry.RegistryKey<DateHierarchyAggregationSupplier> REGISTRY_KEY =
4551
new ValuesSourceRegistry.RegistryKey<>(NAME, DateHierarchyAggregationSupplier.class);
@@ -140,8 +146,8 @@ public void writeTo(StreamOutput out) throws IOException {
140146
DateHierarchyAggregationBuilder::new
141147
);
142148
static {
143-
144-
ValuesSourceAggregationBuilder.declareFields(PARSER, true, true, true);
149+
// ES 8.x introduces field validation. Setting timezoneAware to false to avoid duplication of the timezone field
150+
ValuesSourceAggregationBuilder.declareFields(PARSER, true, true, false);
145151

146152
PARSER.declareString(DateHierarchyAggregationBuilder::interval, INTERVAL_FIELD);
147153

@@ -180,7 +186,7 @@ private DateHierarchyAggregationBuilder(String name) {
180186
}
181187

182188
@Override
183-
protected boolean serializeTargetValueType(Version version) {
189+
protected boolean serializeTargetValueType(TransportVersion version) {
184190
return true;
185191
}
186192

@@ -412,9 +418,4 @@ public boolean equals(Object obj) {
412418
public String getType() {
413419
return NAME;
414420
}
415-
416-
@Override
417-
protected ValuesSourceRegistry.RegistryKey<?> getRegistryKey() {
418-
return REGISTRY_KEY;
419-
}
420421
}

src/main/java/org/opendatasoft/elasticsearch/search/aggregations/bucket/DateHierarchyAggregator.java

Lines changed: 63 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
11
package org.opendatasoft.elasticsearch.search.aggregations.bucket;
22

3-
import org.apache.lucene.index.LeafReaderContext;
43
import org.apache.lucene.index.SortedNumericDocValues;
54
import org.apache.lucene.util.BytesRef;
65
import org.elasticsearch.ElasticsearchException;
7-
import org.elasticsearch.common.Rounding;
86
import org.elasticsearch.common.io.stream.StreamInput;
97
import org.elasticsearch.common.io.stream.StreamOutput;
108
import org.elasticsearch.common.io.stream.Writeable;
119
import org.elasticsearch.common.util.BytesRefHash;
10+
import org.elasticsearch.common.util.LongArray;
11+
import org.elasticsearch.common.util.ObjectArray;
1212
import org.elasticsearch.core.Releasables;
13+
import org.elasticsearch.search.aggregations.AggregationExecutionContext;
1314
import org.elasticsearch.search.aggregations.Aggregator;
1415
import org.elasticsearch.search.aggregations.AggregatorFactories;
1516
import org.elasticsearch.search.aggregations.BucketOrder;
@@ -25,7 +26,6 @@
2526

2627
import java.io.IOException;
2728
import java.util.Arrays;
28-
import java.util.Comparator;
2929
import java.util.Iterator;
3030
import java.util.List;
3131
import java.util.Map;
@@ -54,7 +54,6 @@ public DateHierarchyAggregator(
5454
this.bucketCountThresholds = bucketCountThresholds;
5555
order.validate(this);
5656
this.order = order;
57-
this.partiallyBuiltBucketComparator = order == null ? null : order.partiallyBuiltBucketComparator(b -> b.bucketOrd, this);
5857
}
5958

6059
public static class BucketCountThresholds implements Writeable, ToXContentFragment {
@@ -144,7 +143,6 @@ public boolean equals(Object obj) {
144143
private final long minDocCount;
145144
private final BucketCountThresholds bucketCountThresholds;
146145
private final List<DateHierarchyAggregationBuilder.PreparedRounding> preparedRoundings;
147-
protected final Comparator<InternalPathHierarchy.InternalBucket> partiallyBuiltBucketComparator;
148146

149147
/**
150148
* The collector collects the docs, including or not some score (depending of the including of a Scorer) in the
@@ -153,11 +151,11 @@ public boolean equals(Object obj) {
153151
* The LeafBucketCollector is a "Per-leaf bucket collector". It collects docs for the account of buckets.
154152
*/
155153
@Override
156-
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
154+
public LeafBucketCollector getLeafCollector(AggregationExecutionContext ctx, LeafBucketCollector sub) throws IOException {
157155
if (valuesSource == null) {
158156
return LeafBucketCollector.NO_OP_COLLECTOR;
159157
}
160-
final SortedNumericDocValues values = valuesSource.longValues(ctx);
158+
final SortedNumericDocValues values = valuesSource.longValues(ctx.getLeafReaderContext());
161159

162160
return new LeafBucketCollectorBase(sub, values) {
163161

@@ -189,63 +187,76 @@ public void collect(int doc, long bucket) throws IOException {
189187
}
190188

191189
@Override
192-
public InternalAggregation[] buildAggregations(long[] owningBucketOrdinals) throws IOException {
190+
public InternalAggregation[] buildAggregations(LongArray owningBucketOrdinals) throws IOException {
193191

194-
InternalDateHierarchy.InternalBucket[][] topBucketsPerOrd = new InternalDateHierarchy.InternalBucket[owningBucketOrdinals.length][];
195-
InternalDateHierarchy[] results = new InternalDateHierarchy[owningBucketOrdinals.length];
192+
// InternalDateHierarchy.InternalBucket[][] topBucketsPerOrd = new
193+
// InternalDateHierarchy.InternalBucket[owningBucketOrdinals.length][];
194+
// InternalDateHierarchy[] results = new InternalDateHierarchy[owningBucketOrdinals.length];
196195

197-
for (int ordIdx = 0; ordIdx < owningBucketOrdinals.length; ordIdx++) {
198-
assert owningBucketOrdinals[ordIdx] == 0;
196+
try (
197+
ObjectArray<InternalDateHierarchy.InternalBucket[]> topBucketsPerOrd = bigArrays().newObjectArray(owningBucketOrdinals.size())
198+
) {
199199

200-
// build buckets and store them sorted
201-
final int size = (int) Math.min(bucketOrds.size(), bucketCountThresholds.getShardSize());
200+
InternalDateHierarchy[] results = new InternalDateHierarchy[Math.toIntExact(owningBucketOrdinals.size())];
202201

203-
PathSortedTree<String, InternalDateHierarchy.InternalBucket> pathSortedTree = new PathSortedTree<>(order.comparator(), size);
202+
for (int ordIdx = 0; ordIdx < owningBucketOrdinals.size(); ordIdx++) {
203+
// assert owningBucketOrdinals[ordIdx] == 0;
204+
assert owningBucketOrdinals.get(ordIdx) == 0;
204205

205-
InternalDateHierarchy.InternalBucket spare;
206-
for (int i = 0; i < bucketOrds.size(); i++) {
207-
spare = new InternalDateHierarchy.InternalBucket(0, null, null, null, 0, null);
206+
// build buckets and store them sorted
207+
final int size = (int) Math.min(bucketOrds.size(), bucketCountThresholds.getShardSize());
208208

209-
BytesRef term = new BytesRef();
210-
bucketOrds.get(i, term);
211-
String[] paths = term.utf8ToString().split("/", -1);
209+
PathSortedTree<String, InternalDateHierarchy.InternalBucket> pathSortedTree = new PathSortedTree<>(
210+
order.comparator(),
211+
size
212+
);
212213

213-
spare.paths = paths;
214-
spare.key = term;
215-
spare.level = paths.length - 1;
216-
spare.name = paths[spare.level];
217-
spare.docCount = bucketDocCount(i);
218-
spare.bucketOrd = i;
214+
InternalDateHierarchy.InternalBucket spare;
215+
for (int i = 0; i < bucketOrds.size(); i++) {
216+
spare = new InternalDateHierarchy.InternalBucket(0, null, null, null, 0, null);
219217

220-
pathSortedTree.add(spare.paths, spare);
221-
}
218+
BytesRef term = new BytesRef();
219+
bucketOrds.get(i, term);
220+
String[] paths = term.utf8ToString().split("/", -1);
222221

223-
// Get the top buckets
224-
topBucketsPerOrd[ordIdx] = new InternalDateHierarchy.InternalBucket[size];
225-
long otherHierarchyNodes = pathSortedTree.getFullSize();
226-
Iterator<InternalDateHierarchy.InternalBucket> iterator = pathSortedTree.consumer();
227-
for (int i = 0; i < size; i++) {
228-
final InternalDateHierarchy.InternalBucket bucket = iterator.next();
229-
topBucketsPerOrd[ordIdx][i] = bucket;
230-
otherHierarchyNodes -= 1;
231-
}
222+
spare.paths = paths;
223+
spare.key = term;
224+
spare.level = paths.length - 1;
225+
spare.name = paths[spare.level];
226+
spare.docCount = bucketDocCount(i);
227+
spare.bucketOrd = i;
232228

233-
results[ordIdx] = new InternalDateHierarchy(
234-
name,
235-
Arrays.asList(topBucketsPerOrd[ordIdx]),
236-
order,
237-
minDocCount,
238-
bucketCountThresholds.getRequiredSize(),
239-
bucketCountThresholds.getShardSize(),
240-
otherHierarchyNodes,
241-
metadata()
242-
);
243-
}
229+
pathSortedTree.add(spare.paths, spare);
230+
}
244231

245-
// Build sub-aggregations for pruned buckets
246-
buildSubAggsForAllBuckets(topBucketsPerOrd, b -> b.bucketOrd, (b, aggregations) -> b.aggregations = aggregations);
232+
// Get the top buckets
233+
topBucketsPerOrd.set(ordIdx, new InternalDateHierarchy.InternalBucket[size]);
234+
// topBucketsPerOrd[ordIdx] = new InternalDateHierarchy.InternalBucket[size];
235+
long otherHierarchyNodes = pathSortedTree.getFullSize();
236+
Iterator<InternalDateHierarchy.InternalBucket> iterator = pathSortedTree.consumer();
237+
for (int i = 0; i < size; i++) {
238+
final InternalDateHierarchy.InternalBucket bucket = iterator.next();
239+
topBucketsPerOrd.get(ordIdx)[i] = bucket;
240+
otherHierarchyNodes -= 1;
241+
}
247242

248-
return results;
243+
results[ordIdx] = new InternalDateHierarchy(
244+
name,
245+
Arrays.asList(topBucketsPerOrd.get(ordIdx)),
246+
order,
247+
minDocCount,
248+
bucketCountThresholds.getRequiredSize(),
249+
bucketCountThresholds.getShardSize(),
250+
otherHierarchyNodes,
251+
metadata()
252+
);
253+
}
254+
255+
// Build sub-aggregations for pruned buckets
256+
buildSubAggsForAllBuckets(topBucketsPerOrd, b -> b.bucketOrd, (b, aggregations) -> b.aggregations = aggregations);
257+
258+
return results;
259+
}
249260
}
250261

251262
@Override

src/main/java/org/opendatasoft/elasticsearch/search/aggregations/bucket/InternalDateHierarchy.java

Lines changed: 63 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33
import org.apache.lucene.util.BytesRef;
44
import org.elasticsearch.common.io.stream.StreamInput;
55
import org.elasticsearch.common.io.stream.StreamOutput;
6-
import org.elasticsearch.search.aggregations.Aggregations;
6+
import org.elasticsearch.search.aggregations.AggregationReduceContext;
7+
import org.elasticsearch.search.aggregations.AggregatorReducer;
78
import org.elasticsearch.search.aggregations.BucketOrder;
89
import org.elasticsearch.search.aggregations.InternalAggregation;
910
import org.elasticsearch.search.aggregations.InternalAggregations;
@@ -15,11 +16,13 @@
1516

1617
import java.io.IOException;
1718
import java.util.ArrayList;
19+
import java.util.Arrays;
1820
import java.util.Iterator;
1921
import java.util.LinkedHashMap;
2022
import java.util.List;
2123
import java.util.Map;
2224
import java.util.Objects;
25+
import java.util.TreeMap;
2326

2427
/**
2528
* An internal implementation of {@link InternalMultiBucketAggregation}
@@ -28,11 +31,67 @@
2831
*/
2932
public class InternalDateHierarchy extends InternalMultiBucketAggregation<InternalDateHierarchy, InternalDateHierarchy.InternalBucket> {
3033

34+
@Override
35+
protected AggregatorReducer getLeaderReducer(AggregationReduceContext reduceContext, int size) {
36+
Map<BytesRef, List<InternalBucket>> buckets = new LinkedHashMap<>();
37+
38+
return new AggregatorReducer() {
39+
private long otherHierarchyNodes = 0;
40+
41+
@Override
42+
public void accept(InternalAggregation aggregation) {
43+
InternalDateHierarchy dateHierarchy = (InternalDateHierarchy) aggregation;
44+
45+
otherHierarchyNodes += dateHierarchy.getSumOtherHierarchyNodes();
46+
47+
for (InternalBucket bucket : dateHierarchy.buckets) {
48+
List<InternalBucket> existingBuckets = buckets.get(bucket.key);
49+
if (existingBuckets == null) {
50+
existingBuckets = new ArrayList<>(size);
51+
buckets.put(bucket.key, existingBuckets);
52+
}
53+
existingBuckets.add(bucket);
54+
}
55+
}
56+
57+
@Override
58+
public InternalAggregation get() {
59+
final int size = !reduceContext.isFinalReduce() ? buckets.size() : Math.min(requiredSize, buckets.size());
60+
PathSortedTree<String, InternalBucket> ordered = new PathSortedTree<>(order.comparator(), size);
61+
62+
for (List<InternalBucket> sameTermBuckets : buckets.values()) {
63+
final InternalBucket b = reduceBucket(sameTermBuckets, reduceContext);
64+
if (b.getDocCount() >= minDocCount || !reduceContext.isFinalReduce()) {
65+
reduceContext.consumeBucketsAndMaybeBreak(1);
66+
ordered.add(b.paths, b);
67+
} else {
68+
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(b));
69+
}
70+
}
71+
72+
long sum_other_hierarchy_nodes = ordered.getFullSize() - size + otherHierarchyNodes;
73+
74+
return new InternalDateHierarchy(
75+
getName(),
76+
ordered.getAsList(),
77+
order,
78+
minDocCount,
79+
requiredSize,
80+
shardSize,
81+
sum_other_hierarchy_nodes,
82+
getMetadata()
83+
);
84+
}
85+
};
86+
}
87+
3188
/**
3289
* The bucket class of InternalDateHierarchy.
3390
* @see MultiBucketsAggregation.Bucket
3491
*/
35-
public static class InternalBucket extends InternalMultiBucketAggregation.InternalBucket implements KeyComparable<InternalBucket> {
92+
public static class InternalBucket extends InternalMultiBucketAggregation.InternalBucketWritable
93+
implements
94+
KeyComparable<InternalBucket> {
3695

3796
BytesRef key;
3897
String name;
@@ -104,11 +163,10 @@ public long getDocCount() {
104163
}
105164

106165
@Override
107-
public Aggregations getAggregations() {
166+
public InternalAggregations getAggregations() {
108167
return aggregations;
109168
}
110169

111-
@Override
112170
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
113171
builder.startObject();
114172
builder.field(CommonFields.DOC_COUNT.getPreferredName(), docCount);
@@ -214,62 +272,7 @@ public List<InternalBucket> getBuckets() {
214272
return buckets;
215273
}
216274

217-
/**
218-
* Reduces the given aggregations to a single one and returns it.
219-
*/
220-
@Override
221-
public InternalDateHierarchy reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
222-
Map<BytesRef, List<InternalBucket>> buckets = null;
223-
long otherHierarchyNodes = 0;
224-
225-
// extract buckets from aggregations
226-
for (InternalAggregation aggregation : aggregations) {
227-
InternalDateHierarchy dateHierarchy = (InternalDateHierarchy) aggregation;
228-
if (buckets == null) {
229-
buckets = new LinkedHashMap<>();
230-
}
231-
232-
otherHierarchyNodes += dateHierarchy.getSumOtherHierarchyNodes();
233-
234-
for (InternalBucket bucket : dateHierarchy.buckets) {
235-
List<InternalBucket> existingBuckets = buckets.get(bucket.key);
236-
if (existingBuckets == null) {
237-
existingBuckets = new ArrayList<>(aggregations.size());
238-
buckets.put(bucket.key, existingBuckets);
239-
}
240-
existingBuckets.add(bucket);
241-
}
242-
}
243-
244-
// reduce and sort buckets depending of ordering rules
245-
final int size = !reduceContext.isFinalReduce() ? buckets.size() : Math.min(requiredSize, buckets.size());
246-
PathSortedTree<String, InternalBucket> ordered = new PathSortedTree<>(order.comparator(), size);
247-
for (List<InternalBucket> sameTermBuckets : buckets.values()) {
248-
249-
final InternalBucket b = reduceBucket(sameTermBuckets, reduceContext);
250-
if (b.getDocCount() >= minDocCount || !reduceContext.isFinalReduce()) {
251-
reduceContext.consumeBucketsAndMaybeBreak(1);
252-
ordered.add(b.paths, b);
253-
} else {
254-
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(b));
255-
}
256-
}
257-
258-
long sum_other_hierarchy_nodes = ordered.getFullSize() - size + otherHierarchyNodes;
259-
return new InternalDateHierarchy(
260-
getName(),
261-
ordered.getAsList(),
262-
order,
263-
minDocCount,
264-
requiredSize,
265-
shardSize,
266-
sum_other_hierarchy_nodes,
267-
getMetadata()
268-
);
269-
}
270-
271-
@Override
272-
protected InternalBucket reduceBucket(List<InternalBucket> buckets, ReduceContext context) {
275+
protected InternalBucket reduceBucket(List<InternalBucket> buckets, AggregationReduceContext context) {
273276
List<InternalAggregations> aggregationsList = new ArrayList<>(buckets.size());
274277
InternalBucket reduced = null;
275278
for (InternalBucket bucket : buckets) {

0 commit comments

Comments
 (0)