Skip to content

Commit 2dca113

Browse files
author
Clément Tourrière
committed
Fixes some edge cases and add a path array in serialization
Fixes min_depth parameter
1 parent 7a95d74 commit 2dca113

File tree

4 files changed

+59
-17
lines changed

4 files changed

+59
-17
lines changed

src/main/java/org/opendatasoft/elasticsearch/search/aggregations/bucket/InternalPathHierarchy.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
1818

1919
import java.io.IOException;
20+
import java.util.Arrays;
2021
import java.util.ArrayList;
2122
import java.util.Iterator;
2223
import java.util.List;
@@ -31,6 +32,7 @@
3132
public class InternalPathHierarchy extends InternalMultiBucketAggregation<InternalPathHierarchy,
3233
InternalPathHierarchy.InternalBucket> implements PathHierarchy {
3334
protected static final ParseField SUM_OF_OTHER_HIERARCHY_NODES = new ParseField("sum_other_hierarchy_nodes");
35+
protected static final ParseField PATHS = new ParseField("path");
3436

3537
/**
3638
* The bucket class of InternalPathHierarchy.
@@ -45,13 +47,16 @@ public static class InternalBucket extends InternalMultiBucketAggregation.Intern
4547
protected long docCount;
4648
protected InternalAggregations aggregations;
4749
protected int level;
50+
protected int minDepth;
4851
protected String basename;
4952

50-
public InternalBucket(long docCount, InternalAggregations aggregations, String basename, BytesRef term, int level, String[] paths) {
53+
public InternalBucket(long docCount, InternalAggregations aggregations, String basename,
54+
BytesRef term, int level, int minDepth, String[] paths) {
5155
termBytes = term;
5256
this.docCount = docCount;
5357
this.aggregations = aggregations;
5458
this.level = level;
59+
this.minDepth = minDepth;
5560
this.basename = basename;
5661
this.paths = paths;
5762
}
@@ -64,6 +69,7 @@ public InternalBucket(StreamInput in) throws IOException {
6469
docCount = in.readLong();
6570
aggregations = InternalAggregations.readAggregations(in);
6671
level = in.readInt();
72+
minDepth = in.readInt();
6773
basename = in.readString();
6874
int pathsSize = in.readInt();
6975
paths = new String[pathsSize];
@@ -81,6 +87,7 @@ public void writeTo(StreamOutput out) throws IOException {
8187
out.writeLong(docCount);
8288
aggregations.writeTo(out);
8389
out.writeInt(level);
90+
out.writeInt(minDepth);
8491
out.writeString(basename);
8592
out.writeInt(paths.length);
8693
for (String path: paths) {
@@ -229,7 +236,7 @@ public InternalPathHierarchy create(List<InternalBucket> buckets) {
229236
@Override
230237
public InternalBucket createBucket(InternalAggregations aggregations, InternalBucket prototype) {
231238
return new InternalBucket(prototype.docCount, aggregations, prototype.basename, prototype.termBytes,
232-
prototype.level, prototype.paths);
239+
prototype.level, prototype.minDepth, prototype.paths);
233240
}
234241

235242
@Override
@@ -271,7 +278,13 @@ public InternalPathHierarchy doReduce(List<InternalAggregation> aggregations, Re
271278
final InternalBucket b = sameTermBuckets.get(0).reduce(sameTermBuckets, reduceContext);
272279
if (b.getDocCount() >= minDocCount || !reduceContext.isFinalReduce()) {
273280
reduceContext.consumeBucketsAndMaybeBreak(1);
274-
ordered.add(b.paths, b);
281+
String [] pathsForTree;
282+
if (b.minDepth > 0) {
283+
pathsForTree = Arrays.copyOfRange(b.paths, b.minDepth, b.paths.length);
284+
} else {
285+
pathsForTree = b.paths;
286+
}
287+
ordered.add(pathsForTree, b);
275288
} else {
276289
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(b));
277290
}
@@ -311,6 +324,7 @@ public XContentBuilder doXContentBody(XContentBuilder builder, Params params) th
311324
builder.startObject();
312325
builder.field(CommonFields.KEY.getPreferredName(), currentBucket.basename);
313326
builder.field(CommonFields.DOC_COUNT.getPreferredName(), currentBucket.docCount);
327+
builder.field(PATHS.getPreferredName(), Arrays.copyOf(currentBucket.paths, currentBucket.paths.length -1));
314328
currentBucket.getAggregations().toXContentInternal(builder, params);
315329

316330
prevBucket = currentBucket;
@@ -331,7 +345,7 @@ public XContentBuilder doXContentBody(XContentBuilder builder, Params params) th
331345

332346
@Override
333347
protected int doHashCode() {
334-
return Objects.hash(buckets, separator, order, requiredSize, shardSize, otherHierarchyNodes);
348+
return Objects.hash(buckets, separator, order, requiredSize, shardSize, otherHierarchyNodes, minDocCount);
335349
}
336350

337351
@Override
@@ -340,6 +354,7 @@ protected boolean doEquals(Object obj) {
340354
return Objects.equals(buckets, that.buckets)
341355
&& Objects.equals(separator, that.separator)
342356
&& Objects.equals(order, that.order)
357+
&& Objects.equals(minDocCount, that.minDocCount)
343358
&& Objects.equals(requiredSize, that.requiredSize)
344359
&& Objects.equals(shardSize, that.shardSize)
345360
&& Objects.equals(otherHierarchyNodes, that.otherHierarchyNodes);

src/main/java/org/opendatasoft/elasticsearch/search/aggregations/bucket/PathHierarchyAggregationBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,7 @@ protected XContentBuilder doXContentBody(XContentBuilder builder, Params params)
302302
*/
303303
@Override
304304
protected int innerHashCode() {
305-
return Objects.hash(separator, minDepth, maxDepth, depth, order);
305+
return Objects.hash(separator, minDepth, maxDepth, depth, order, minDocCount, bucketCountThresholds);
306306
}
307307

308308
@Override

src/main/java/org/opendatasoft/elasticsearch/search/aggregations/bucket/PathHierarchyAggregator.java

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
import java.io.IOException;
2727
import java.util.ArrayList;
28+
import java.util.Arrays;
2829
import java.util.Iterator;
2930
import java.util.List;
3031
import java.util.Map;
@@ -120,6 +121,7 @@ public boolean equals(Object obj) {
120121
private final BytesRefHash bucketOrds;
121122
private final BucketOrder order;
122123
private final long minDocCount;
124+
private final int minDepth;
123125
private final BucketCountThresholds bucketCountThresholds;
124126
private final BytesRef separator;
125127

@@ -132,6 +134,7 @@ public PathHierarchyAggregator(
132134
long minDocCount,
133135
BucketCountThresholds bucketCountThresholds,
134136
BytesRef separator,
137+
int minDepth,
135138
Aggregator parent,
136139
List<PipelineAggregator> pipelineAggregators,
137140
Map<String, Object> metaData
@@ -143,6 +146,7 @@ public PathHierarchyAggregator(
143146
bucketOrds = new BytesRefHash(1, context.bigArrays());
144147
this.order = InternalOrder.validate(order, this);
145148
this.bucketCountThresholds = bucketCountThresholds;
149+
this.minDepth = minDepth;
146150
}
147151

148152
/**
@@ -173,7 +177,7 @@ public void collect(int doc, long owningBucketOrdinal) throws IOException {
173177
// SortedBinaryDocValues don't guarantee uniqueness so we need to take care of dups
174178
for (int i = 0; i < valuesCount; ++i) {
175179
final BytesRef bytesValue = values.nextValue();
176-
if (previous.get().equals(bytesValue)) {
180+
if (i > 0 && previous.get().equals(bytesValue)) {
177181
continue;
178182
}
179183
long bucketOrdinal = bucketOrds.add(bytesValue);
@@ -201,21 +205,32 @@ public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOE
201205

202206
InternalPathHierarchy.InternalBucket spare = null;
203207
for (int i = 0; i < bucketOrds.size(); i++) {
204-
spare = new InternalPathHierarchy.InternalBucket(0, null, null, new BytesRef(), 0, null);
208+
spare = new InternalPathHierarchy.InternalBucket(0, null, null, new BytesRef(), 0, 0, null);
205209
BytesRef term = new BytesRef();
206210
bucketOrds.get(i, term);
207211

208-
String [] paths = term.utf8ToString().split(Pattern.quote(separator.utf8ToString()), -1);
212+
String quotedPattern = Pattern.quote(separator.utf8ToString());
213+
214+
String [] paths = term.utf8ToString().split(quotedPattern, -1);
215+
216+
String [] pathsForTree;
217+
218+
if (minDepth > 0) {
219+
pathsForTree = Arrays.copyOfRange(paths, minDepth, paths.length);
220+
} else {
221+
pathsForTree = paths;
222+
}
209223

210224
spare.termBytes = BytesRef.deepCopyOf(term);
211225
spare.aggregations = bucketAggregations(i);
212-
spare.level = paths.length - 1;
226+
spare.level = pathsForTree.length - 1;
213227
spare.docCount = bucketDocCount(i);
214228
spare.basename = paths[paths.length - 1];
229+
spare.minDepth = minDepth;
215230
spare.bucketOrd = i;
216231
spare.paths = paths;
217232

218-
pathSortedTree.add(paths, spare);
233+
pathSortedTree.add(pathsForTree, spare);
219234

220235
consumeBucketsAndMaybeBreak(1);
221236
}

src/main/java/org/opendatasoft/elasticsearch/search/aggregations/bucket/PathHierarchyAggregatorFactory.java

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package org.opendatasoft.elasticsearch.search.aggregations.bucket;
22

33
import org.apache.lucene.index.LeafReaderContext;
4+
import org.apache.lucene.util.ArrayUtil;
45
import org.apache.lucene.util.BytesRef;
56
import org.apache.lucene.util.BytesRefBuilder;
67
import org.apache.lucene.util.FutureArrays;
@@ -102,7 +103,7 @@ protected Aggregator doCreateInternal(
102103
bucketCountThresholds.ensureValidity();
103104
return new PathHierarchyAggregator(
104105
name, factories, context,
105-
valuesSourceBytes, order, minDocCount, bucketCountThresholds, separator,
106+
valuesSourceBytes, order, minDocCount, bucketCountThresholds, separator, minDepth,
106107
parent, pipelineAggregators, metaData);
107108
}
108109

@@ -162,17 +163,17 @@ public boolean advanceExact(int docId) throws IOException {
162163

163164
// A new path needs to be add
164165
if (startNewValOffset != -1) {
166+
cleanVal.append(val.bytes, val.offset + startNewValOffset, offset - startNewValOffset);
165167
if (depth >= minDepth) {
166-
cleanVal.append(val.bytes, val.offset + startNewValOffset, offset - startNewValOffset);
167168
values[t++].copyBytes(cleanVal);
168-
cleanVal.append(separator);
169169
}
170170
startNewValOffset = -1;
171+
cleanVal.append(separator);
171172
depth ++;
172173
// two separators following each other
173174
} else if (keepBlankPath) {
174175
count++;
175-
grow();
176+
growExact();
176177
values[t++].copyBytes(cleanVal);
177178
cleanVal.append(separator);
178179
depth ++;
@@ -185,8 +186,10 @@ public boolean advanceExact(int docId) throws IOException {
185186
} else {
186187
if (startNewValOffset == -1) {
187188
startNewValOffset = offset;
188-
count++;
189-
grow();
189+
if (depth >= minDepth) {
190+
count++;
191+
growExact();
192+
}
190193
}
191194
}
192195
}
@@ -202,8 +205,17 @@ public boolean advanceExact(int docId) throws IOException {
202205
} else
203206
return false;
204207
}
205-
}
206208

209+
final void growExact() {
210+
if (values.length < count) {
211+
final int oldLen = values.length;
212+
values = ArrayUtil.growExact(values, count);
213+
for (int i = oldLen; i < count; ++i) {
214+
values[i] = new BytesRefBuilder();
215+
}
216+
}
217+
}
218+
}
207219

208220
/**
209221
* To get ValuesSource as sorted bytes.

0 commit comments

Comments
 (0)