diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/logging/LogMessageKeys.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/logging/LogMessageKeys.java index 090cce3088..9a8dfca434 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/logging/LogMessageKeys.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/logging/LogMessageKeys.java @@ -339,6 +339,8 @@ public enum LogMessageKeys { // Lucene PARTITION_ID, PARTITIONING_KEY, + PARTITION_MERGING_STATE, + DELETED_DOCUMENTS_COUNT, // Record context properties PROPERTY_NAME, diff --git a/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintainer.java b/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintainer.java index ebf5e1f944..17742366c3 100644 --- a/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintainer.java +++ b/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintainer.java @@ -66,6 +66,7 @@ import com.apple.foundationdb.record.query.QueryToKeyMatcher; import com.apple.foundationdb.tuple.Tuple; import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.ByteString; import com.google.protobuf.Message; import org.apache.lucene.document.BinaryPoint; import org.apache.lucene.document.Document; @@ -596,11 +597,19 @@ private CompletableFuture tryDelete(@Nonnull FDBInd return CompletableFuture.completedFuture(0); } try { - int countDeleted = deleteDocument(groupingKey, partitionInfo.getId(), record.getPrimaryKey()); + final int partitionId = partitionInfo.getId(); + int countDeleted = deleteDocument(groupingKey, partitionId, record.getPrimaryKey()); // this might be 0 when in writeOnly mode, but otherwise should not happen. + ByteString primaryKeyToDeleteList = null; + if (partitionInfo.getMergingState() != LucenePartitionInfoProto.LucenePartitionInfo.MergingState.NORMAL) { + // je: todo: this may be off until the final merge transaction and the delete-list processing will be done in a single transaction + countDeleted += deleteDocument(groupingKey, LucenePartitioner.getBufferPartitionId(partitionId), record.getPrimaryKey()); + primaryKeyToDeleteList = ByteString.copyFrom(record.getPrimaryKey().pack()); + } + final int finalCountDeleted = countDeleted; if (countDeleted > 0) { - return partitioner.decrementCountAndSave(groupingKey, countDeleted, partitionInfo.getId()) - .thenApply(vignore -> countDeleted); + return partitioner.decrementCountAndSave(groupingKey, finalCountDeleted, partitionId, primaryKeyToDeleteList) + .thenApply(vignore -> finalCountDeleted); } else { return CompletableFuture.completedFuture(countDeleted); } diff --git a/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LucenePartitioner.java b/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LucenePartitioner.java index 2aa6c273af..573bb11d11 100644 --- a/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LucenePartitioner.java +++ b/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LucenePartitioner.java @@ -56,6 +56,7 @@ import com.apple.foundationdb.record.provider.foundationdb.FDBIndexableRecord; import com.apple.foundationdb.record.provider.foundationdb.FDBIndexedRecord; import com.apple.foundationdb.record.provider.foundationdb.FDBRecordContext; +import com.apple.foundationdb.record.provider.foundationdb.FDBRecordStore; import com.apple.foundationdb.record.provider.foundationdb.FDBStoreTimer; import com.apple.foundationdb.record.provider.foundationdb.IndexMaintainerState; import com.apple.foundationdb.record.provider.foundationdb.IndexOrphanBehavior; @@ -514,11 +515,236 @@ private CompletableFuture addToAndSavePartitionMetadata(@Nonnull final builder.setTo(ByteString.copyFrom(partitioningKey.pack())); } savePartitionMetadata(groupingKey, builder); + if (assignedPartition.hasMergingState() && !assignedPartition.getMergingState().equals(LucenePartitionInfoProto.LucenePartitionInfo.MergingState.NORMAL)) { + // Note that the partition info also contains the buffer partition, and should not be changed during draining + return getBufferPartitionId(assignedPartition.getId()); + } return assignedPartition.getId(); }); }); } + public static int getBufferPartitionId(int partitionId) { + // TODO: better distinct id for buffer partitions + return partitionId | 0xf00000; + } + + + /** + * Drain buffer partition - move all the documents from the buffer to the actual partition. + * The merging state should be handled by the caller. + * + * @param partitionId the ID of the destination partition + * @param groupingKey the grouping key for both partitions + * @param partitionInfo preset partition info + * @param context the FDB record context to use for the operation + * @return CompletableFuture containing the number of documents moved + * @throws RecordCoreException if the operation fails + */ + public CompletableFuture drainBufferPartitionAsync( + int partitionId, + @Nonnull Tuple groupingKey, + @Nonnull LucenePartitionInfoProto.LucenePartitionInfo partitionInfo, + @Nonnull FDBRecordContext context) { + final long startTime = System.nanoTime(); + int bufferPartitionId = getBufferPartitionId(partitionId); + + return state.store.asBuilder().setContext(context).openAsync() + .thenCompose(store -> { + // Create cursor to get all documents from source partition + final var fieldInfos = LuceneIndexExpressions.getDocumentFieldDerivations(state.index, state.store.getRecordMetaData()); + ScanComparisons comparisons = groupingKey.isEmpty() ? + ScanComparisons.EMPTY : + Objects.requireNonNull(ScanComparisons.from(new Comparisons.SimpleComparison(Comparisons.Type.EQUALS, groupingKey.get(0)))); + LuceneScanParameters scan = new LuceneScanQueryParameters( + comparisons, + new LuceneQuerySearchClause(LuceneQueryType.QUERY, "*:*", false), + new Sort(new SortField(partitionFieldNameInLucene, SortField.Type.LONG), + new SortField(LuceneIndexMaintainer.PRIMARY_KEY_SEARCH_NAME, SortField.Type.STRING)), + null, + null, + null); + ScanProperties scanProperties = ExecuteProperties.newBuilder().build().asScanProperties(false); + LuceneScanQuery scanQuery = (LuceneScanQuery)scan.bind(state.store, state.index, EvaluationContext.EMPTY); + // we create the cursor here explicitly (vs. e.g. calling state.store.scanIndex(...)) because we want the search + // to be performed specifically in the provided partition. + // alternatively we can include a partitionInfo in the lucene scan parameters--tbd + final LuceneRecordCursor cursor = new LuceneRecordCursor( + state.context.getExecutor(), + state.context.getPropertyStorage().getPropertyValue(LuceneRecordContextProperties.LUCENE_EXECUTOR_SERVICE), + this, + Objects.requireNonNull(state.context.getPropertyStorage().getPropertyValue(LuceneRecordContextProperties.LUCENE_INDEX_CURSOR_PAGE_SIZE)), + scanProperties, state, scanQuery.getQuery(), scanQuery.getSort(), null, + scanQuery.getGroupKey(), partitionInfo, scanQuery.getLuceneQueryHighlightParameters(), scanQuery.getTermMap(), + scanQuery.getStoredFields(), scanQuery.getStoredFieldTypes(), + LuceneAnalyzerRegistryImpl.instance().getLuceneAnalyzerCombinationProvider(state.index, LuceneAnalyzerType.FULL_TEXT, fieldInfos), + LuceneAnalyzerRegistryImpl.instance().getLuceneAnalyzerCombinationProvider(state.index, LuceneAnalyzerType.AUTO_COMPLETE, fieldInfos)); + + Collection recordTypes = store.getRecordMetaData().recordTypesForIndex(state.index); + if (recordTypes.stream().map(RecordType::isSynthetic).distinct().count() > 1) { + // don't support mix of synthetic/regular + throw new RecordCoreException("mix of synthetic and non-synthetic record types in index is not supported"); + } + + // Determine record types for proper loading + if (recordTypes.stream().map(RecordType::isSynthetic).distinct().count() > 1) { + throw new RecordCoreException("mix of synthetic and non-synthetic record types in index is not supported"); + } + + // Fetch all records from source partition + CompletableFuture>> fetchedRecordsFuture; + if (recordTypes.iterator().next().isSynthetic()) { + fetchedRecordsFuture = cursor.mapPipelined( + indexEntry -> store.loadSyntheticRecord(indexEntry.getPrimaryKey()), + store.getPipelineSize(PipelineOperation.INDEX_TO_RECORD) + ).asList(); + } else { + fetchedRecordsFuture = store + .fetchIndexRecords(cursor, IndexOrphanBehavior.SKIP) + .map(FDBIndexedRecord::getStoredRecord) + .asList(); + } + + // Ensure cursor is closed + fetchedRecordsFuture = fetchedRecordsFuture.whenComplete((ignored, throwable) -> cursor.close()); + + return fetchedRecordsFuture.thenApply(records -> { + if (records.isEmpty()) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug(KeyValueLogMessage.of("No records found in buffer partition", + LogMessageKeys.PARTITION_ID, bufferPartitionId)); + } + return 0; + } + + try { + // Get LuceneIndexMaintainer for document operations + LuceneIndexMaintainer indexMaintainer = (LuceneIndexMaintainer)state.store.getIndexMaintainer(state.index); + + // Delete all documents from buffer partition and add to partition + for (FDBIndexableRecord rec : records) { + indexMaintainer.deleteDocument(groupingKey, bufferPartitionId, rec.getPrimaryKey()); + LuceneDocumentFromRecord.getRecordFields(state.index.getRootExpression(), rec) + .entrySet().forEach(entry -> { + indexMaintainer.writeDocument(rec, entry, partitionId); + }); + } + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug(KeyValueLogMessage.of("Moved documents from buffer partition to partition", + LogMessageKeys.PARTITION_ID, partitionId, + LogMessageKeys.GROUPING_KEY, groupingKey, + LogMessageKeys.INDEX_REPARTITION_DOCUMENT_COUNT, records.size(), + LogMessageKeys.DURATION_MILLIS, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime))); + } + + return records.size(); + + } catch (Exception e) { + throw new RecordCoreException("Failed to move documents between partitions", e) + .addLogInfo("sourcePartitionId", bufferPartitionId) + .addLogInfo("destinationPartitionId", partitionId) + .addLogInfo("groupingKey", groupingKey); + } + }); + }); + } + + + /** + * Process all delete keys in a partition's delete list. + * + * @param partitionId the ID of the partition to process delete keys for + * @param groupingKey the grouping key for the partition + * @param context the FDB record context to use for operations + * @return CompletableFuture containing the count of documents successfully deleted + */ + public CompletableFuture processPartitionDeleteKeys( + int partitionId, + @Nonnull Tuple groupingKey, + @Nonnull FDBRecordContext context) { + + // Reload the partition info + return getPartitionMetaInfoByIdWithContext(partitionId, groupingKey, context) + .thenCompose(partitionInfo -> { + if (partitionInfo == null) { + throw new RecordCoreException("Partition not found") + .addLogInfo(LogMessageKeys.PARTITION_ID, partitionId) + .addLogInfo(LogMessageKeys.GROUPING_KEY, groupingKey); + } + + final List deleteKeysList = partitionInfo.getDeleteKeysList(); + + if (deleteKeysList.isEmpty()) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug(KeyValueLogMessage.of("No delete keys to process", + LogMessageKeys.PARTITION_ID, partitionId)); + } + return CompletableFuture.completedFuture(0); + } + + return CompletableFuture.supplyAsync(() -> { + try { + FDBRecordStore storeWithContext = state.store.asBuilder().setContext(context).open(); + LuceneIndexMaintainer indexMaintainer = (LuceneIndexMaintainer) storeWithContext.getIndexMaintainer(state.index); + int deleteCount = 0; + int totalDeleteKeys = deleteKeysList.size(); + + // Process each delete key + for (ByteString deleteKeyBytes: deleteKeysList) { + Tuple primaryKey = Tuple.fromBytes(deleteKeyBytes.toByteArray()); + int deletedDocs = indexMaintainer.deleteDocument(groupingKey, partitionId, primaryKey); + deleteCount += deletedDocs; + } + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug(KeyValueLogMessage.of("successfully processed deleted list", + LogMessageKeys.PARTITION_ID, partitionId, + LogMessageKeys.KEY_COUNT, totalDeleteKeys, + LogMessageKeys.DELETED_DOCUMENTS_COUNT, deleteCount)); + } + + return deleteCount; + + } catch (Exception e) { + throw new RecordCoreException("Failed to process delete keys from partition", e) + .addLogInfo(LogMessageKeys.PARTITION_ID, partitionId) + .addLogInfo(LogMessageKeys.GROUPING_KEY, groupingKey) + .addLogInfo(LogMessageKeys.KEY_COUNT, deleteKeysList.size()); + } + }, context.getExecutor()); + }); + } + + /** + * Get partition metadata by ID using the provided context. + */ + private CompletableFuture getPartitionMetaInfoByIdWithContext( + int partitionId, + @Nonnull Tuple groupingKey, + @Nonnull FDBRecordContext context) { + + Range range = state.indexSubspace.subspace(groupingKey.add(PARTITION_META_SUBSPACE)).range(); + + return context.ensureActive() + .getRange(range, Integer.MAX_VALUE, true, StreamingMode.WANT_ALL) + .asList() + .thenApply(keyValues -> { + for (KeyValue kv : keyValues) { + try { + LucenePartitionInfoProto.LucenePartitionInfo partition = + LucenePartitionInfoProto.LucenePartitionInfo.parseFrom(kv.getValue()); + if (partition.getId() == partitionId) { + return partition; + } + } catch (InvalidProtocolBufferException e) { + // Skip malformed entries + } + } + return null; // Not found + }); + } + /** * create a partition metadata key. * @@ -552,6 +778,34 @@ void savePartitionMetadata(@Nonnull Tuple groupingKey, updatedPartition.toByteArray()); } + /** + * Set the partition info for a specific partition using the provided context. + * + * @param partitionId the ID of the partition to update + * @param groupingKey the grouping key for the partition + * @param context the FDB record context to use for the operation (instead of state.context) + * @param partitionInfo the new partition info (assumed to be valid) + */ + public void setPartitionInfo(int partitionId, + @Nonnull Tuple groupingKey, + @Nonnull FDBRecordContext context, + @Nonnull LucenePartitionInfoProto.LucenePartitionInfo partitionInfo) { + + Tuple partitionKey = getPartitionKey(partitionInfo); + byte[] metadataKey = partitionMetadataKeyFromPartitioningValue(groupingKey, partitionKey); + + // Save the updated partition metadata using the provided context (NOT state.context) + context.ensureActive().set(metadataKey, partitionInfo.toByteArray()); + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug(KeyValueLogMessage.of("Set partition info for partition", + LogMessageKeys.PARTITION_ID, partitionId, + LogMessageKeys.GROUPING_KEY, groupingKey, + LogMessageKeys.PARTITION_MERGING_STATE, partitionInfo.getMergingState(), + LogMessageKeys.GROUPING_KEY, partitionKey)); + } + } + @Nonnull CompletableFuture findPartitionInfo(@Nonnull Tuple groupingKey, @Nonnull Tuple partitioningKey) { Range range = new Range(state.indexSubspace.subspace(groupingKey.add(PARTITION_META_SUBSPACE)).pack(), @@ -612,9 +866,10 @@ CompletableFuture decrementCountAndSave(@Nonnull Tuple groupingKey, - int amount, final int partitionId) { + int amount, final int partitionId, final ByteString primaryKeyToDeleteList) { return state.context.doWithWriteLock(new LockIdentifier(partitionMetadataSubspace(groupingKey)), () -> getPartitionMetaInfoById(partitionId, groupingKey).thenAccept(serialized -> { if (serialized == null) { @@ -632,6 +887,9 @@ CompletableFuture decrementCountAndSave(@Nonnull Tuple groupingKey, throw new RecordCoreInternalException("Issue updating Lucene partition metadata (resulting count < 0)", LogMessageKeys.PARTITION_ID, partitionId); } + if (primaryKeyToDeleteList != null) { + builder.addDeleteKeys(primaryKeyToDeleteList); + } savePartitionMetadata(groupingKey, builder); })); } diff --git a/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/directory/FDBDirectoryManager.java b/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/directory/FDBDirectoryManager.java index 780c1b55a6..ed2434d5fb 100644 --- a/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/directory/FDBDirectoryManager.java +++ b/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/directory/FDBDirectoryManager.java @@ -45,6 +45,7 @@ import com.apple.foundationdb.record.metadata.expressions.KeyExpression; import com.apple.foundationdb.record.provider.foundationdb.FDBRecordContext; import com.apple.foundationdb.record.provider.foundationdb.FDBRecordContextConfig; +import com.apple.foundationdb.record.provider.foundationdb.FDBStoreTimer; import com.apple.foundationdb.record.provider.foundationdb.FDBTransactionPriority; import com.apple.foundationdb.record.provider.foundationdb.IndexDeferredMaintenanceControl; import com.apple.foundationdb.record.provider.foundationdb.IndexMaintainerState; @@ -179,13 +180,74 @@ private CompletableFuture mergeIndex(Tuple groupingKey, // partition list end return false; } - agileContext.flush(); - mergeIndexNow(groupingKey, partitionId); + mergeIndexNow(partitioner, agileContext, groupingKey, partitionId, lastPartitionInfo.get()); return true; })); } } + private void mergeIndexNow(@Nonnull LucenePartitioner partitioner, + final AgilityContext agileContext, + Tuple groupingKey, + int partitionId, + LucenePartitionInfoProto.LucenePartitionInfo lastPartitionInfo) { + final AgilityContext agilityContext = getAgilityContext(true, true); + try { + // Set merging state + LucenePartitionInfoProto.LucenePartitionInfo updatedPartitionInfo = + lastPartitionInfo + .toBuilder() + .setMergingState(LucenePartitionInfoProto.LucenePartitionInfo.MergingState.MERGING) + .build(); + + agileContext.accept(context -> + partitioner.setPartitionInfo(partitionId, groupingKey, context, updatedPartitionInfo) + ); + agileContext.flush(); + mergeIndexWithContext(groupingKey, partitionId, agilityContext); + } finally { + // Drain buffer index + LucenePartitionInfoProto.LucenePartitionInfo drainPartitionInfo = + lastPartitionInfo + .toBuilder() + .setMergingState(LucenePartitionInfoProto.LucenePartitionInfo.MergingState.DRAINING) + .clearDeleteKeys() // clear all delete keys + .build(); + + // je: Todo: handle as future + agilityContext.asyncToSync(FDBStoreTimer.Waits.WAIT_ONLINE_MERGE_INDEX, + agileContext.apply(context -> + // process deletes queue and set state to DRAINING in one transaction + partitioner.processPartitionDeleteKeys(partitionId, groupingKey, context) + .thenApply(ignore -> { + partitioner.setPartitionInfo(partitionId, groupingKey, context, drainPartitionInfo); + return null; + }))); + agileContext.flush(); + + // here: drain the buffer partition + // je: Todo: handle future and make use of agility context for (possibly) long operations + agilityContext.asyncToSync(FDBStoreTimer.Waits.WAIT_ONLINE_MERGE_INDEX, + agileContext.apply(context -> + partitioner.drainBufferPartitionAsync(partitionId, groupingKey, lastPartitionInfo, context))); + + // Clear merging state + LucenePartitionInfoProto.LucenePartitionInfo normalPartitionInfo = + lastPartitionInfo + .toBuilder() + .setMergingState(LucenePartitionInfoProto.LucenePartitionInfo.MergingState.NORMAL) + .build(); + agileContext.accept(context -> + partitioner.setPartitionInfo(partitionId, groupingKey, context, normalPartitionInfo) + ); + // IndexWriter may release the file lock in a finally block in its own code, so if there is an error in its + // code, we need to commit. We could optimize this a bit, and have it only flush if it has committed anything + // but that should be rare. + + agilityContext.flushAndClose(); + } + } + private void mergeIndexNow(Tuple groupingKey, @Nullable final Integer partitionId) { final AgilityContext agilityContext = getAgilityContext(true, true); try { diff --git a/fdb-record-layer-lucene/src/main/proto/lucene_partitioning.proto b/fdb-record-layer-lucene/src/main/proto/lucene_partitioning.proto index b03a146256..f41fae32a7 100644 --- a/fdb-record-layer-lucene/src/main/proto/lucene_partitioning.proto +++ b/fdb-record-layer-lucene/src/main/proto/lucene_partitioning.proto @@ -32,4 +32,13 @@ message LucenePartitionInfo { required bytes to = 3; /* count of documents in this partition */ required int32 count = 4; + /* merging state */ + enum MergingState { + NORMAL = 0; // Normal state - no need to do anything special + MERGING = 1; // Merging state - query from both buffer partition + partition, insert new items to buffer partition, delete from both, maintain the delete list + DRAINING = 2; // Draining state - query from both, insert new items to buffer partition, delete from both, do not maintain delete list + } + optional MergingState merging_state = 5; + /* delete list - to be maintained during merge */ + repeated bytes delete_keys = 6; } diff --git a/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintenanceTest.java b/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintenanceTest.java index 2cb3271384..530917bc90 100644 --- a/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintenanceTest.java +++ b/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintenanceTest.java @@ -720,7 +720,7 @@ void chaosMergeAndUpdateTest() throws InterruptedException, IOException { assertNull(failedMerge.get()); assertThat(successfulMerges.get(), Matchers.greaterThan(10)); assertThat(conflicts.get(), Matchers.greaterThan(10)); - assertThat(fileLockFailures.get(), Matchers.greaterThan(10)); +// je: no lock failures assertThat(fileLockFailures.get(), Matchers.greaterThan(10)); assertThat(docCount.get(), Matchers.greaterThanOrEqualTo(200)); // validate index is sane diff --git a/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LucenePartitionerTest.java b/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LucenePartitionerTest.java index 23da69fe76..2684016ada 100644 --- a/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LucenePartitionerTest.java +++ b/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LucenePartitionerTest.java @@ -67,7 +67,7 @@ void testDecrementCountNegative() throws Exception { final LucenePartitioner partitioner = indexMaintainer.getPartitioner(); dataModel.groupingKeyToPrimaryKeyToPartitionKey.keySet().forEach(groupingKey -> { final LucenePartitionInfoProto.LucenePartitionInfo firstPartition = partitioner.getAllPartitionMetaInfo(groupingKey).join().stream().findFirst().get(); - Assertions.assertThatThrownBy(() -> partitioner.decrementCountAndSave(groupingKey, 5000, firstPartition.getId()).join()) + Assertions.assertThatThrownBy(() -> partitioner.decrementCountAndSave(groupingKey, 5000, firstPartition.getId(), null).join()) .hasCauseInstanceOf(RecordCoreInternalException.class); }); // Commit here to ensure that the data is not corrupt as a result @@ -191,7 +191,7 @@ void testMergeNonEmptyPartitionFails(boolean isGrouped) throws Exception { // Get the first partition (which has documents in the actual Lucene index) final LucenePartitionInfoProto.LucenePartitionInfo partition = partitions.get(0); // zero out the partition's count - partitioner.decrementCountAndSave(groupingKey, partition.getCount(), partition.getId()); + partitioner.decrementCountAndSave(groupingKey, partition.getCount(), partition.getId(), null); partitionsWithZeroCount.put(groupingKey, partition.getId()); });