Skip to content

Commit 6936bbb

Browse files
committed
refactor: decouple work enqueuing from proofs
1 parent c59a07b commit 6936bbb

File tree

2 files changed

+86
-26
lines changed

2 files changed

+86
-26
lines changed

x/sync/manager.go

Lines changed: 37 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -327,7 +327,7 @@ func (m *Manager) requestChangeProof(ctx context.Context, work *workItem) {
327327

328328
if work.localRootID == targetRootID {
329329
// Start root is the same as the end root, so we're done.
330-
m.completeWorkItem(ctx, work, work.end, targetRootID, nil)
330+
m.completeWorkItem(work, work.end, targetRootID)
331331
m.finishWorkItem()
332332
return
333333
}
@@ -342,7 +342,7 @@ func (m *Manager) requestChangeProof(ctx context.Context, work *workItem) {
342342
return
343343
}
344344
work.start = maybe.Nothing[[]byte]()
345-
m.completeWorkItem(ctx, work, maybe.Nothing[[]byte](), targetRootID, nil)
345+
m.completeWorkItem(work, maybe.Nothing[[]byte](), targetRootID)
346346
return
347347
}
348348

@@ -401,7 +401,7 @@ func (m *Manager) requestRangeProof(ctx context.Context, work *workItem) {
401401
return
402402
}
403403
work.start = maybe.Nothing[[]byte]()
404-
m.completeWorkItem(ctx, work, maybe.Nothing[[]byte](), targetRootID, nil)
404+
m.completeWorkItem(work, maybe.Nothing[[]byte](), targetRootID)
405405
return
406406
}
407407

@@ -545,7 +545,17 @@ func (m *Manager) handleRangeProofResponse(
545545
largestHandledKey = maybe.Some(rangeProof.KeyChanges[len(rangeProof.KeyChanges)-1].Key)
546546
}
547547

548-
m.completeWorkItem(ctx, work, largestHandledKey, targetRootID, rangeProof.EndProof)
548+
// Find the next key to fetch.
549+
// If this is empty, then we have no more keys to fetch.
550+
if !largestHandledKey.IsNothing() {
551+
nextKey, err := m.findNextKey(ctx, largestHandledKey.Value(), work.end, rangeProof.EndProof)
552+
if err != nil {
553+
m.setError(err)
554+
return nil
555+
}
556+
largestHandledKey = nextKey
557+
}
558+
m.completeWorkItem(work, largestHandledKey, targetRootID)
549559
return nil
550560
}
551561

@@ -569,6 +579,10 @@ func (m *Manager) handleChangeProofResponse(
569579
startKey := maybeBytesToMaybe(request.StartKey)
570580
endKey := maybeBytesToMaybe(request.EndKey)
571581

582+
var (
583+
largestHandledKey maybe.Maybe[[]byte]
584+
endProof []merkledb.ProofNode
585+
)
572586
switch changeProofResp := changeProofResp.Response.(type) {
573587
case *pb.SyncGetChangeProofResponse_ChangeProof:
574588
// The server had enough history to send us a change proof
@@ -601,7 +615,7 @@ func (m *Manager) handleChangeProofResponse(
601615
return fmt.Errorf("%w due to %w", errInvalidChangeProof, err)
602616
}
603617

604-
largestHandledKey := work.end
618+
largestHandledKey = work.end
605619
// if the proof wasn't empty, apply changes to the sync DB
606620
if len(changeProof.KeyChanges) > 0 {
607621
if err := m.config.DB.CommitChangeProof(ctx, &changeProof); err != nil {
@@ -610,8 +624,8 @@ func (m *Manager) handleChangeProofResponse(
610624
}
611625
largestHandledKey = maybe.Some(changeProof.KeyChanges[len(changeProof.KeyChanges)-1].Key)
612626
}
627+
endProof = changeProof.EndProof
613628

614-
m.completeWorkItem(ctx, work, largestHandledKey, targetRootID, changeProof.EndProof)
615629
case *pb.SyncGetChangeProofResponse_RangeProof:
616630
var rangeProof merkledb.RangeProof
617631
if err := rangeProof.UnmarshalProto(changeProofResp.RangeProof); err != nil {
@@ -633,7 +647,7 @@ func (m *Manager) handleChangeProofResponse(
633647
return err
634648
}
635649

636-
largestHandledKey := work.end
650+
largestHandledKey = work.end
637651
if len(rangeProof.KeyChanges) > 0 {
638652
// Add all the key-value pairs we got to the database.
639653
if err := m.config.DB.CommitRangeProof(ctx, work.start, work.end, &rangeProof); err != nil {
@@ -642,14 +656,25 @@ func (m *Manager) handleChangeProofResponse(
642656
}
643657
largestHandledKey = maybe.Some(rangeProof.KeyChanges[len(rangeProof.KeyChanges)-1].Key)
644658
}
659+
endProof = rangeProof.EndProof
645660

646-
m.completeWorkItem(ctx, work, largestHandledKey, targetRootID, rangeProof.EndProof)
647661
default:
648662
return fmt.Errorf(
649663
"%w: %T",
650664
errUnexpectedChangeProofResponse, changeProofResp,
651665
)
652666
}
667+
// Find the next key to fetch.
668+
// If this is empty, then we have no more keys to fetch.
669+
if !largestHandledKey.IsNothing() {
670+
nextKey, err := m.findNextKey(ctx, largestHandledKey.Value(), work.end, endProof)
671+
if err != nil {
672+
m.setError(err)
673+
return nil
674+
}
675+
largestHandledKey = nextKey
676+
}
677+
m.completeWorkItem(work, largestHandledKey, targetRootID)
653678

654679
return nil
655680
}
@@ -933,28 +958,14 @@ func (m *Manager) setError(err error) {
933958
// that gave us the range up to and including [largestHandledKey].
934959
//
935960
// Assumes [m.workLock] is not held.
936-
func (m *Manager) completeWorkItem(ctx context.Context, work *workItem, largestHandledKey maybe.Maybe[[]byte], rootID ids.ID, proofOfLargestKey []merkledb.ProofNode) {
961+
func (m *Manager) completeWorkItem(work *workItem, largestHandledKey maybe.Maybe[[]byte], rootID ids.ID) {
937962
if !maybe.Equal(largestHandledKey, work.end, bytes.Equal) {
938-
// The largest handled key isn't equal to the end of the work item.
939-
// Find the start of the next key range to fetch.
940-
// Note that [largestHandledKey] can't be Nothing.
941-
// Proof: Suppose it is. That means that we got a range/change proof that proved up to the
942-
// greatest key-value pair in the database. That means we requested a proof with no upper
943-
// bound. That is, [workItem.end] is Nothing. Since we're here, [bothNothing] is false,
944-
// which means [workItem.end] isn't Nothing. Contradiction.
945-
nextStartKey, err := m.findNextKey(ctx, largestHandledKey.Value(), work.end, proofOfLargestKey)
946-
if err != nil {
947-
m.setError(err)
948-
return
949-
}
950-
951-
// nextStartKey being Nothing indicates that the entire range has been completed
952-
if nextStartKey.IsNothing() {
963+
// largestHandledKey being Nothing indicates that the entire range has been completed
964+
if largestHandledKey.IsNothing() {
953965
largestHandledKey = work.end
954966
} else {
955967
// the full range wasn't completed, so enqueue a new work item for the range [nextStartKey, workItem.end]
956-
m.enqueueWork(newWorkItem(work.localRootID, nextStartKey, work.end, work.priority, time.Now()))
957-
largestHandledKey = nextStartKey
968+
m.enqueueWork(newWorkItem(work.localRootID, largestHandledKey, work.end, work.priority, time.Now()))
958969
}
959970
}
960971

x/sync/sync_test.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -422,6 +422,55 @@ func Test_Sync_FindNextKey_ExtraValues(t *testing.T) {
422422
require.True(isPrefix(midPointVal, nextKey.Value()))
423423
}
424424

425+
func Test_Sync_FindNextKey_IdenticalKeys(t *testing.T) {
426+
require := require.New(t)
427+
428+
db, err := merkledb.New(
429+
context.Background(),
430+
memdb.New(),
431+
newDefaultDBConfig(),
432+
)
433+
require.NoError(err)
434+
435+
testKeys := [][]byte{
436+
{0x10},
437+
{0x11, 0x11},
438+
{0x12, 0x34},
439+
{0x15},
440+
}
441+
442+
for i, key := range testKeys {
443+
value := []byte{byte(i + 1)}
444+
require.NoError(db.Put(key, value))
445+
}
446+
447+
targetRoot, err := db.GetMerkleRoot(context.Background())
448+
require.NoError(err)
449+
450+
// Get the proof for the test key
451+
testKey := []byte{0x11, 0x11}
452+
proof, err := db.GetRangeProof(context.Background(), maybe.Some(testKey), maybe.Some(testKey), 1)
453+
require.NoError(err)
454+
455+
ctx := context.Background()
456+
syncer, err := NewManager(ManagerConfig{
457+
DB: db,
458+
RangeProofClient: p2ptest.NewSelfClient(t, ctx, ids.EmptyNodeID, NewGetRangeProofHandler(db)),
459+
ChangeProofClient: p2ptest.NewSelfClient(t, ctx, ids.EmptyNodeID, NewGetChangeProofHandler(db)),
460+
TargetRoot: targetRoot,
461+
SimultaneousWorkLimit: 5,
462+
Log: logging.NoLog{},
463+
BranchFactor: merkledb.BranchFactor16,
464+
}, prometheus.NewRegistry())
465+
require.NoError(err)
466+
467+
// Since both keys are identical, the next key should be nothing, since the range is complete
468+
nextKey, err := syncer.findNextKey(context.Background(), testKey, maybe.Some([]byte{0x11, 0x11}), proof.EndProof)
469+
require.NoError(err)
470+
471+
require.Equal(maybe.Nothing[[]byte](), nextKey)
472+
}
473+
425474
func TestFindNextKeyEmptyEndProof(t *testing.T) {
426475
require := require.New(t)
427476
now := time.Now().UnixNano()

0 commit comments

Comments
 (0)