Skip to content

Commit ec77e22

Browse files
authored
Merge pull request #8 from opendatasoft/fix/ordering
Fixes ordering issues. Closes #7
2 parents 863ab4a + dab0501 commit ec77e22

File tree

8 files changed

+320
-97
lines changed

8 files changed

+320
-97
lines changed

README.md

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,8 @@ Usage
2727
- `min_depth`: Set minimum depth level. Default to 0.
2828
- `max_depth`: Set maximum depth level. `-1` means no limit. Default to 3.
2929
- `depth`: Retrieve values for specified depth. Shortcut, instead of setting `min_depth` and `max_depth` parameters to the same value.
30-
- `keep_blank_path`: Keep blank path as bucket. if this option is set to false, chained separator will be ignored. Default to false.
31-
32-
Please note that `sum_other_doc_count` is returned alongside aggregation buckets. It returns the sum of doc_count which are not returned from shards due to size/shard_size and so can be used to calibrate size/shard_size.
30+
- `keep_blank_path`: Keep blank path as bucket. if this option is set to false, chained separator will be ignored. Default to false.
31+
- `min_doc_count`: Return buckets containing at least `min_doc_count` document. Default to 0
3332

3433

3534
Examples

src/main/java/org/opendatasoft/elasticsearch/search/aggregations/bucket/BucketPriorityQueue.java

Lines changed: 0 additions & 23 deletions
This file was deleted.

src/main/java/org/opendatasoft/elasticsearch/search/aggregations/bucket/InternalPathHierarchy.java

Lines changed: 54 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -7,31 +7,30 @@
77
import org.elasticsearch.common.xcontent.XContentBuilder;
88
import org.elasticsearch.search.aggregations.Aggregation;
99
import org.elasticsearch.search.aggregations.Aggregations;
10-
import org.elasticsearch.search.aggregations.KeyComparable;
1110
import org.elasticsearch.search.aggregations.BucketOrder;
12-
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
1311
import org.elasticsearch.search.aggregations.InternalAggregation;
1412
import org.elasticsearch.search.aggregations.InternalAggregations;
15-
import org.elasticsearch.search.aggregations.InternalOrder;
1613
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
14+
import org.elasticsearch.search.aggregations.InternalOrder;
15+
import org.elasticsearch.search.aggregations.KeyComparable;
16+
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
1717
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
1818

1919
import java.io.IOException;
2020
import java.util.ArrayList;
21-
import java.util.HashMap;
2221
import java.util.Iterator;
2322
import java.util.List;
2423
import java.util.Map;
2524
import java.util.Objects;
26-
import java.util.Arrays;
25+
import java.util.TreeMap;
2726

2827
/**
2928
* An internal implementation of {@link InternalMultiBucketAggregation} which extends {@link Aggregation}.
3029
* Mainly, returns the builder and makes the reduce of buckets.
3130
*/
3231
public class InternalPathHierarchy extends InternalMultiBucketAggregation<InternalPathHierarchy,
3332
InternalPathHierarchy.InternalBucket> implements PathHierarchy {
34-
protected static final ParseField SUM_OF_OTHER_DOC_COUNTS = new ParseField("sum_other_doc_count");
33+
protected static final ParseField SUM_OF_OTHER_HIERARCHY_NODES = new ParseField("sum_other_hierarchy_nodes");
3534

3635
/**
3736
* The bucket class of InternalPathHierarchy.
@@ -42,17 +41,19 @@ public static class InternalBucket extends InternalMultiBucketAggregation.Intern
4241

4342
BytesRef termBytes;
4443
long bucketOrd;
44+
protected String[] paths;
4545
protected long docCount;
4646
protected InternalAggregations aggregations;
4747
protected int level;
4848
protected String basename;
4949

50-
public InternalBucket(long docCount, InternalAggregations aggregations, String basename, BytesRef term, int level) {
50+
public InternalBucket(long docCount, InternalAggregations aggregations, String basename, BytesRef term, int level, String[] paths) {
5151
termBytes = term;
5252
this.docCount = docCount;
5353
this.aggregations = aggregations;
5454
this.level = level;
5555
this.basename = basename;
56+
this.paths = paths;
5657
}
5758

5859
/**
@@ -64,6 +65,11 @@ public InternalBucket(StreamInput in) throws IOException {
6465
aggregations = InternalAggregations.readAggregations(in);
6566
level = in.readInt();
6667
basename = in.readString();
68+
int pathsSize = in.readInt();
69+
paths = new String[pathsSize];
70+
for (int i=0; i < pathsSize; i++) {
71+
paths[i] = in.readString();
72+
}
6773
}
6874

6975
/**
@@ -76,6 +82,10 @@ public void writeTo(StreamOutput out) throws IOException {
7682
aggregations.writeTo(out);
7783
out.writeInt(level);
7884
out.writeString(basename);
85+
out.writeInt(paths.length);
86+
for (String path: paths) {
87+
out.writeString(path);
88+
}
7989
}
8090

8191
@Override
@@ -89,7 +99,7 @@ public String getKeyAsString() {
8999
}
90100

91101
@Override
92-
public int compareKey(InternalPathHierarchy.InternalBucket other) {
102+
public int compareKey(InternalBucket other) {
93103
return termBytes.compareTo(other.termBytes);
94104
}
95105

@@ -138,26 +148,28 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
138148
private BucketOrder order;
139149
private final int requiredSize;
140150
private final int shardSize;
141-
private final long otherDocCount;
142-
private final long minDocCount = 1;
151+
private final long otherHierarchyNodes;
152+
private final long minDocCount;
143153

144154
public InternalPathHierarchy(
145155
String name,
146156
List<InternalBucket> buckets,
147157
BucketOrder order,
158+
long minDocCount,
148159
int requiredSize,
149160
int shardSize,
150-
long otherDocCount,
161+
long otherHierarchyNodes,
151162
BytesRef separator,
152163
List<PipelineAggregator> pipelineAggregators,
153164
Map<String, Object> metaData
154165
) {
155166
super(name, pipelineAggregators, metaData);
156167
this.buckets = buckets;
157168
this.order = order;
169+
this.minDocCount = minDocCount;
158170
this.requiredSize = requiredSize;
159171
this.shardSize = shardSize;
160-
this.otherDocCount = otherDocCount;
172+
this.otherHierarchyNodes = otherHierarchyNodes;
161173
this.separator = separator;
162174
}
163175

@@ -167,11 +179,16 @@ public InternalPathHierarchy(
167179
public InternalPathHierarchy(StreamInput in) throws IOException {
168180
super(in);
169181
order = InternalOrder.Streams.readOrder(in);
182+
minDocCount = in.readVLong();
170183
requiredSize = readSize(in);
171184
shardSize = readSize(in);
172-
otherDocCount = in.readVLong();
185+
otherHierarchyNodes = in.readVLong();
173186
separator = in.readBytesRef();
174-
this.buckets = in.readList(InternalBucket::new);
187+
int bucketsSize = in.readInt();
188+
this.buckets = new ArrayList<>(bucketsSize);
189+
for (int i=0; i<bucketsSize; i++) {
190+
this.buckets.add(new InternalBucket(in));
191+
}
175192
}
176193

177194
/**
@@ -180,11 +197,15 @@ public InternalPathHierarchy(StreamInput in) throws IOException {
180197
@Override
181198
protected void doWriteTo(StreamOutput out) throws IOException {
182199
InternalOrder.Streams.writeOrder(order, out);
200+
out.writeVLong(minDocCount);
183201
writeSize(requiredSize, out);
184202
writeSize(shardSize, out);
185-
out.writeVLong(otherDocCount);
203+
out.writeVLong(otherHierarchyNodes);
186204
out.writeBytesRef(separator);
187-
out.writeList(buckets);
205+
out.writeInt(buckets.size());
206+
for (InternalBucket bucket: buckets) {
207+
bucket.writeTo(out);
208+
}
188209
}
189210

190211
@Override
@@ -196,20 +217,20 @@ protected int getShardSize() {
196217
return shardSize;
197218
}
198219

199-
public long getSumOfOtherDocCounts() {
200-
return otherDocCount;
220+
public long getSumOtherHierarchyNodes() {
221+
return otherHierarchyNodes;
201222
}
202223

203224
@Override
204225
public InternalPathHierarchy create(List<InternalBucket> buckets) {
205-
return new InternalPathHierarchy(this.name, buckets, order, requiredSize, shardSize, otherDocCount,
226+
return new InternalPathHierarchy(this.name, buckets, order, minDocCount, requiredSize, shardSize, otherHierarchyNodes,
206227
this.separator, this.pipelineAggregators(), this.metaData);
207228
}
208229

209230
@Override
210231
public InternalBucket createBucket(InternalAggregations aggregations, InternalBucket prototype) {
211232
return new InternalBucket(prototype.docCount, aggregations, prototype.basename, prototype.termBytes,
212-
prototype.level);
233+
prototype.level, prototype.paths);
213234
}
214235

215236
@Override
@@ -223,16 +244,16 @@ public List<InternalBucket> getBuckets() {
223244
@Override
224245
public InternalPathHierarchy doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
225246
Map<BytesRef, List<InternalBucket>> buckets = null;
226-
long otherDocCount = 0;
247+
long otherHierarchyNodes = 0;
227248

228249
// extract buckets from aggregations
229250
for (InternalAggregation aggregation : aggregations) {
230251
InternalPathHierarchy pathHierarchy = (InternalPathHierarchy) aggregation;
231252
if (buckets == null) {
232-
buckets = new HashMap<>();
253+
buckets = new TreeMap<>();
233254
}
234255

235-
otherDocCount += pathHierarchy.getSumOfOtherDocCounts();
256+
otherHierarchyNodes += pathHierarchy.getSumOtherHierarchyNodes();
236257

237258
for (InternalBucket bucket : pathHierarchy.buckets) {
238259
List<InternalBucket> existingBuckets = buckets.get(bucket.termBytes);
@@ -246,28 +267,20 @@ public InternalPathHierarchy doReduce(List<InternalAggregation> aggregations, Re
246267

247268
// reduce and sort buckets depending of ordering rules
248269
final int size = !reduceContext.isFinalReduce() ? buckets.size() : Math.min(requiredSize, buckets.size());
249-
final BucketPriorityQueue<InternalBucket> ordered = new BucketPriorityQueue<>(size, order.comparator(null));
270+
PathSortedTree<String, InternalBucket> ordered = new PathSortedTree<>(order.comparator(null), size);
250271
for (List<InternalBucket> sameTermBuckets : buckets.values()) {
251272
final InternalBucket b = sameTermBuckets.get(0).reduce(sameTermBuckets, reduceContext);
252-
if (b.docCount >= minDocCount || !reduceContext.isFinalReduce()) {
253-
InternalBucket removed = ordered.insertWithOverflow(b);
254-
if (removed != null) {
255-
otherDocCount += removed.getDocCount();
256-
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(removed));
257-
} else {
258-
reduceContext.consumeBucketsAndMaybeBreak(1);
259-
}
273+
if (b.getDocCount() >= minDocCount || !reduceContext.isFinalReduce()) {
274+
reduceContext.consumeBucketsAndMaybeBreak(1);
275+
ordered.add(b.paths, b);
260276
} else {
261277
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(b));
262278
}
263279
}
264-
InternalBucket[] reducedBuckets = new InternalBucket[ordered.size()];
265-
for (int i = ordered.size() - 1; i >= 0; i--) {
266-
reducedBuckets[i] = ordered.pop();
267-
}
268280

269-
return new InternalPathHierarchy(getName(), Arrays.asList(reducedBuckets), order, requiredSize, shardSize,
270-
otherDocCount, separator, pipelineAggregators(), getMetaData());
281+
long sum_other_hierarchy_nodes = ordered.getFullSize() - size + otherHierarchyNodes;
282+
return new InternalPathHierarchy(getName(), ordered.getAsList(), order, minDocCount, requiredSize, shardSize,
283+
sum_other_hierarchy_nodes, separator, pipelineAggregators(), getMetaData());
271284
}
272285

273286
private void doXContentInternal(XContentBuilder builder, Params params, InternalBucket currentBucket,
@@ -309,7 +322,7 @@ private void doXContentInternal(XContentBuilder builder, Params params, Internal
309322

310323
@Override
311324
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
312-
builder.field(SUM_OF_OTHER_DOC_COUNTS.getPreferredName(), otherDocCount);
325+
// builder.field(SUM_OF_OTHER_HIERARCHY_NODES.getPreferredName(), otherHierarchyNodes);
313326
Iterator<InternalBucket> bucketIterator = buckets.iterator();
314327
builder.startArray(CommonFields.BUCKETS.getPreferredName());
315328
if (bucketIterator.hasNext()) {
@@ -322,7 +335,7 @@ public XContentBuilder doXContentBody(XContentBuilder builder, Params params) th
322335

323336
@Override
324337
protected int doHashCode() {
325-
return Objects.hash(buckets, separator, order, requiredSize, shardSize, otherDocCount);
338+
return Objects.hash(buckets, separator, order, requiredSize, shardSize, otherHierarchyNodes);
326339
}
327340

328341
@Override
@@ -333,6 +346,6 @@ protected boolean doEquals(Object obj) {
333346
&& Objects.equals(order, that.order)
334347
&& Objects.equals(requiredSize, that.requiredSize)
335348
&& Objects.equals(shardSize, that.shardSize)
336-
&& Objects.equals(otherDocCount, that.otherDocCount);
349+
&& Objects.equals(otherHierarchyNodes, that.otherHierarchyNodes);
337350
}
338351
}

0 commit comments

Comments
 (0)