Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,8 @@ public enum LogMessageKeys {
// Lucene
PARTITION_ID,
PARTITIONING_KEY,
PARTITION_MERGING_STATE,
DELETED_DOCUMENTS_COUNT,

// Record context properties
PROPERTY_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import com.apple.foundationdb.record.query.QueryToKeyMatcher;
import com.apple.foundationdb.tuple.Tuple;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
import org.apache.lucene.document.BinaryPoint;
import org.apache.lucene.document.Document;
Expand Down Expand Up @@ -596,11 +597,19 @@
return CompletableFuture.completedFuture(0);
}
try {
int countDeleted = deleteDocument(groupingKey, partitionInfo.getId(), record.getPrimaryKey());
final int partitionId = partitionInfo.getId();
int countDeleted = deleteDocument(groupingKey, partitionId, record.getPrimaryKey());
// this might be 0 when in writeOnly mode, but otherwise should not happen.
ByteString primaryKeyToDeleteList = null;
if (partitionInfo.getMergingState() != LucenePartitionInfoProto.LucenePartitionInfo.MergingState.NORMAL) {
// je: todo: this may be off until the final merge transaction and the delete-list processing will be done in a single transaction

Check warning on line 605 in fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintainer.java

View check run for this annotation

fdb.teamscale.io / Teamscale | Findings

fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintainer.java#L605

[New] todo: this may be off until the final merge transaction and the delete-list processing will be done in a single transaction https://fdb.teamscale.io/findings/details/foundationdb-fdb-record-layer?t=FORK_MR%2F3807%2Fjjezra%2Flucene_buffer_partition%3AHEAD&id=B2CD2D2D356568ACB8694D85CECD6E2E
countDeleted += deleteDocument(groupingKey, LucenePartitioner.getBufferPartitionId(partitionId), record.getPrimaryKey());
primaryKeyToDeleteList = ByteString.copyFrom(record.getPrimaryKey().pack());
}
final int finalCountDeleted = countDeleted;
if (countDeleted > 0) {
return partitioner.decrementCountAndSave(groupingKey, countDeleted, partitionInfo.getId())
.thenApply(vignore -> countDeleted);
return partitioner.decrementCountAndSave(groupingKey, finalCountDeleted, partitionId, primaryKeyToDeleteList)
.thenApply(vignore -> finalCountDeleted);
} else {
return CompletableFuture.completedFuture(countDeleted);
}
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import com.apple.foundationdb.record.metadata.expressions.KeyExpression;
import com.apple.foundationdb.record.provider.foundationdb.FDBRecordContext;
import com.apple.foundationdb.record.provider.foundationdb.FDBRecordContextConfig;
import com.apple.foundationdb.record.provider.foundationdb.FDBStoreTimer;
import com.apple.foundationdb.record.provider.foundationdb.FDBTransactionPriority;
import com.apple.foundationdb.record.provider.foundationdb.IndexDeferredMaintenanceControl;
import com.apple.foundationdb.record.provider.foundationdb.IndexMaintainerState;
Expand Down Expand Up @@ -179,13 +180,74 @@
// partition list end
return false;
}
agileContext.flush();
mergeIndexNow(groupingKey, partitionId);
mergeIndexNow(partitioner, agileContext, groupingKey, partitionId, lastPartitionInfo.get());
return true;
}));
}
}

private void mergeIndexNow(@Nonnull LucenePartitioner partitioner,
final AgilityContext agileContext,
Tuple groupingKey,
int partitionId,
LucenePartitionInfoProto.LucenePartitionInfo lastPartitionInfo) {
final AgilityContext agilityContext = getAgilityContext(true, true);
try {
// Set merging state
LucenePartitionInfoProto.LucenePartitionInfo updatedPartitionInfo =
lastPartitionInfo
.toBuilder()
.setMergingState(LucenePartitionInfoProto.LucenePartitionInfo.MergingState.MERGING)
.build();

agileContext.accept(context ->
partitioner.setPartitionInfo(partitionId, groupingKey, context, updatedPartitionInfo)
);
agileContext.flush();
mergeIndexWithContext(groupingKey, partitionId, agilityContext);
} finally {
// Drain buffer index
LucenePartitionInfoProto.LucenePartitionInfo drainPartitionInfo =
lastPartitionInfo
.toBuilder()
.setMergingState(LucenePartitionInfoProto.LucenePartitionInfo.MergingState.DRAINING)
.clearDeleteKeys() // clear all delete keys
.build();

// je: Todo: handle as future

Check warning on line 217 in fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/directory/FDBDirectoryManager.java

View check run for this annotation

fdb.teamscale.io / Teamscale | Findings

fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/directory/FDBDirectoryManager.java#L217

Todo: handle as future https://fdb.teamscale.io/findings/details/foundationdb-fdb-record-layer?t=FORK_MR%2F3807%2Fjjezra%2Flucene_buffer_partition%3AHEAD&id=C629304E6F6652BD69ACEE3CE08C0A47
agilityContext.asyncToSync(FDBStoreTimer.Waits.WAIT_ONLINE_MERGE_INDEX,
agileContext.apply(context ->
// process deletes queue and set state to DRAINING in one transaction
partitioner.processPartitionDeleteKeys(partitionId, groupingKey, context)
.thenApply(ignore -> {
partitioner.setPartitionInfo(partitionId, groupingKey, context, drainPartitionInfo);
return null;

Check warning on line 224 in fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/directory/FDBDirectoryManager.java

View check run for this annotation

fdb.teamscale.io / Teamscale | Findings

fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/directory/FDBDirectoryManager.java#L224

Return in `finally` block https://fdb.teamscale.io/findings/details/foundationdb-fdb-record-layer?t=FORK_MR%2F3807%2Fjjezra%2Flucene_buffer_partition%3AHEAD&id=0DF66959BA65CDCD02AD1DAF805EF759
})));

Check warning on line 225 in fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/directory/FDBDirectoryManager.java

View check run for this annotation

fdb.teamscale.io / Teamscale | Findings

fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/directory/FDBDirectoryManager.java#L222-L225

Method always returns the same value (null) https://fdb.teamscale.io/findings/details/foundationdb-fdb-record-layer?t=FORK_MR%2F3807%2Fjjezra%2Flucene_buffer_partition%3AHEAD&id=3C04F1EF007B3D8FD3130309F3FC3DE1

Check warning on line 225 in fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/directory/FDBDirectoryManager.java

View check run for this annotation

fdb.teamscale.io / Teamscale | Findings

fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/directory/FDBDirectoryManager.java#L219-L225

Method always returns the same value (null) https://fdb.teamscale.io/findings/details/foundationdb-fdb-record-layer?t=FORK_MR%2F3807%2Fjjezra%2Flucene_buffer_partition%3AHEAD&id=7AF2DF1AA76F57598666C4137DCF7FAD
agileContext.flush();

// here: drain the buffer partition
// je: Todo: handle future and make use of agility context for (possibly) long operations

Check warning on line 229 in fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/directory/FDBDirectoryManager.java

View check run for this annotation

fdb.teamscale.io / Teamscale | Findings

fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/directory/FDBDirectoryManager.java#L229

Todo: handle future and make use of agility context for (possibly) long operations https://fdb.teamscale.io/findings/details/foundationdb-fdb-record-layer?t=FORK_MR%2F3807%2Fjjezra%2Flucene_buffer_partition%3AHEAD&id=89CA84F1A08DE8754C0EF832A05D9CA3
agilityContext.asyncToSync(FDBStoreTimer.Waits.WAIT_ONLINE_MERGE_INDEX,
agileContext.apply(context ->
partitioner.drainBufferPartitionAsync(partitionId, groupingKey, lastPartitionInfo, context)));

// Clear merging state
LucenePartitionInfoProto.LucenePartitionInfo normalPartitionInfo =
lastPartitionInfo
.toBuilder()
.setMergingState(LucenePartitionInfoProto.LucenePartitionInfo.MergingState.NORMAL)
.build();
agileContext.accept(context ->
partitioner.setPartitionInfo(partitionId, groupingKey, context, normalPartitionInfo)
);
// IndexWriter may release the file lock in a finally block in its own code, so if there is an error in its
// code, we need to commit. We could optimize this a bit, and have it only flush if it has committed anything
// but that should be rare.

agilityContext.flushAndClose();
}
}

private void mergeIndexNow(Tuple groupingKey, @Nullable final Integer partitionId) {
final AgilityContext agilityContext = getAgilityContext(true, true);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,13 @@ message LucenePartitionInfo {
required bytes to = 3;
/* count of documents in this partition */
required int32 count = 4;
/* merging state */
enum MergingState {
NORMAL = 0; // Normal state - no need to do anything special
MERGING = 1; // Merging state - query from both buffer partition + partition, insert new items to buffer partition, delete from both, maintain the delete list
DRAINING = 2; // Draining state - query from both, insert new items to buffer partition, delete from both, do not maintain delete list
}
optional MergingState merging_state = 5;
/* delete list - to be maintained during merge */
repeated bytes delete_keys = 6;
}
Original file line number Diff line number Diff line change
Expand Up @@ -720,7 +720,7 @@
assertNull(failedMerge.get());
assertThat(successfulMerges.get(), Matchers.greaterThan(10));
assertThat(conflicts.get(), Matchers.greaterThan(10));
assertThat(fileLockFailures.get(), Matchers.greaterThan(10));
// je: no lock failures assertThat(fileLockFailures.get(), Matchers.greaterThan(10));

Check warning on line 723 in fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintenanceTest.java

View check run for this annotation

fdb.teamscale.io / Teamscale | Findings

fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintenanceTest.java#L723

Commented Out Code https://fdb.teamscale.io/findings/details/foundationdb-fdb-record-layer?t=FORK_MR%2F3807%2Fjjezra%2Flucene_buffer_partition%3AHEAD&id=113760BEF91B66775FF16BF05DF1B44B
assertThat(docCount.get(), Matchers.greaterThanOrEqualTo(200));

// validate index is sane
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ void testDecrementCountNegative() throws Exception {
final LucenePartitioner partitioner = indexMaintainer.getPartitioner();
dataModel.groupingKeyToPrimaryKeyToPartitionKey.keySet().forEach(groupingKey -> {
final LucenePartitionInfoProto.LucenePartitionInfo firstPartition = partitioner.getAllPartitionMetaInfo(groupingKey).join().stream().findFirst().get();
Assertions.assertThatThrownBy(() -> partitioner.decrementCountAndSave(groupingKey, 5000, firstPartition.getId()).join())
Assertions.assertThatThrownBy(() -> partitioner.decrementCountAndSave(groupingKey, 5000, firstPartition.getId(), null).join())
.hasCauseInstanceOf(RecordCoreInternalException.class);
});
// Commit here to ensure that the data is not corrupt as a result
Expand Down Expand Up @@ -191,7 +191,7 @@ void testMergeNonEmptyPartitionFails(boolean isGrouped) throws Exception {
// Get the first partition (which has documents in the actual Lucene index)
final LucenePartitionInfoProto.LucenePartitionInfo partition = partitions.get(0);
// zero out the partition's count
partitioner.decrementCountAndSave(groupingKey, partition.getCount(), partition.getId());
partitioner.decrementCountAndSave(groupingKey, partition.getCount(), partition.getId(), null);
partitionsWithZeroCount.put(groupingKey, partition.getId());
});

Expand Down
Loading