Skip to content

Commit 389b0bf

Browse files
committed
refactor: remove callbacks
1 parent 7971241 commit 389b0bf

File tree

3 files changed

+53
-41
lines changed

3 files changed

+53
-41
lines changed

x/sync/db.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,11 @@ type DB interface {
2323
type ProofClient interface {
2424
merkledb.Clearer
2525
merkledb.MerkleRootGetter
26-
HandleRangeProofResponse(ctx context.Context, request *pb.SyncGetRangeProofRequest, responseBytes []byte, onFinish func(maybe.Maybe[[]byte])) error
26+
HandleRangeProofResponse(ctx context.Context, request *pb.SyncGetRangeProofRequest, responseBytes []byte) (maybe.Maybe[[]byte], error)
2727
HandleChangeProofResponse(
2828
ctx context.Context,
2929
request *pb.SyncGetChangeProofRequest,
3030
responseBytes []byte,
31-
onFinish func(maybe.Maybe[[]byte]),
32-
) error
33-
RegisterErrorHandler(handler func(error))
31+
) (maybe.Maybe[[]byte], error)
32+
Error() error
3433
}

x/sync/manager.go

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -174,9 +174,6 @@ func NewManager(config ManagerConfig, registerer prometheus.Registerer) (*Manage
174174
metrics: metrics,
175175
}
176176
m.unprocessedWorkCond.L = &m.workLock
177-
m.config.ProofClient.RegisterErrorHandler(func(err error) {
178-
m.setError(err)
179-
})
180177

181178
return m, nil
182179
}
@@ -219,6 +216,10 @@ func (m *Manager) sync(ctx context.Context) {
219216
switch {
220217
case ctx.Err() != nil:
221218
return // [m.workLock] released by defer.
219+
case m.config.ProofClient.Error() != nil:
220+
// If the proof client has an error, we can't continue.
221+
m.setError(m.config.ProofClient.Error())
222+
return
222223
case m.processingWorkItems >= m.config.SimultaneousWorkLimit:
223224
// We're already processing the maximum number of work items.
224225
// Wait until one of them finishes.
@@ -498,14 +499,17 @@ func (m *Manager) handleRangeProofResponse(
498499
return err
499500
}
500501

501-
return m.config.ProofClient.HandleRangeProofResponse(
502+
nextKey, err := m.config.ProofClient.HandleRangeProofResponse(
502503
ctx,
503504
request,
504505
responseBytes,
505-
func(largestHandledKey maybe.Maybe[[]byte]) {
506-
m.completeWorkItem(work, largestHandledKey, targetRootID)
507-
},
508506
)
507+
if err != nil {
508+
return err
509+
}
510+
511+
m.completeWorkItem(work, nextKey, targetRootID)
512+
return nil
509513
}
510514

511515
func (m *Manager) handleChangeProofResponse(
@@ -520,14 +524,17 @@ func (m *Manager) handleChangeProofResponse(
520524
return err
521525
}
522526

523-
return m.config.ProofClient.HandleChangeProofResponse(
527+
nextKey, err := m.config.ProofClient.HandleChangeProofResponse(
524528
ctx,
525529
request,
526530
responseBytes,
527-
func(largestHandledKey maybe.Maybe[[]byte]) {
528-
m.completeWorkItem(work, largestHandledKey, targetRootID)
529-
},
530531
)
532+
if err != nil {
533+
return err
534+
}
535+
536+
m.completeWorkItem(work, nextKey, targetRootID)
537+
return nil
531538
}
532539

533540
func (m *Manager) Error() error {

x/sync/merkledb_client.go

Lines changed: 32 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"context"
99
"fmt"
1010
"slices"
11+
"sync"
1112

1213
"golang.org/x/exp/maps"
1314
"google.golang.org/protobuf/proto"
@@ -31,7 +32,8 @@ type xSyncClient struct {
3132
db DB
3233
config *ClientConfig
3334
tokenSize int
34-
setError func(error)
35+
err error
36+
errorOnce sync.Once
3537
}
3638

3739
func NewClient(db DB, config *ClientConfig) (*xSyncClient, error) {
@@ -50,8 +52,14 @@ func NewClient(db DB, config *ClientConfig) (*xSyncClient, error) {
5052
}, nil
5153
}
5254

53-
func (c *xSyncClient) RegisterErrorHandler(handler func(error)) {
54-
c.setError = handler
55+
func (c *xSyncClient) Error() error {
56+
return c.err
57+
}
58+
59+
func (c *xSyncClient) setError(err error) {
60+
c.errorOnce.Do(func() {
61+
c.err = err
62+
})
5563
}
5664

5765
func (c *xSyncClient) Clear() error {
@@ -62,15 +70,15 @@ func (c *xSyncClient) GetMerkleRoot(ctx context.Context) (ids.ID, error) {
6270
return c.db.GetMerkleRoot(ctx)
6371
}
6472

65-
func (c *xSyncClient) HandleRangeProofResponse(ctx context.Context, request *pb.SyncGetRangeProofRequest, responseBytes []byte, onFinish func(maybe.Maybe[[]byte])) error {
73+
func (c *xSyncClient) HandleRangeProofResponse(ctx context.Context, request *pb.SyncGetRangeProofRequest, responseBytes []byte) (maybe.Maybe[[]byte], error) {
6674
var rangeProofProto pb.RangeProof
6775
if err := proto.Unmarshal(responseBytes, &rangeProofProto); err != nil {
68-
return err
76+
return maybe.Nothing[[]byte](), err
6977
}
7078

7179
var rangeProof merkledb.RangeProof
7280
if err := rangeProof.UnmarshalProto(&rangeProofProto); err != nil {
73-
return err
81+
return maybe.Nothing[[]byte](), err
7482
}
7583

7684
start := maybeBytesToMaybe(request.StartKey)
@@ -86,15 +94,15 @@ func (c *xSyncClient) HandleRangeProofResponse(ctx context.Context, request *pb.
8694
c.tokenSize,
8795
c.config.Hasher,
8896
); err != nil {
89-
return err
97+
return maybe.Nothing[[]byte](), err
9098
}
9199

92100
largestHandledKey := end
93101

94102
// Replace all the key-value pairs in the DB from start to end with values from the response.
95103
if err := c.db.CommitRangeProof(ctx, start, end, &rangeProof); err != nil {
96104
c.setError(err)
97-
return nil
105+
return maybe.Nothing[[]byte](), err
98106
}
99107

100108
if len(rangeProof.KeyChanges) > 0 {
@@ -107,24 +115,22 @@ func (c *xSyncClient) HandleRangeProofResponse(ctx context.Context, request *pb.
107115
nextKey, err := c.findNextKey(ctx, largestHandledKey.Value(), end, rangeProof.EndProof)
108116
if err != nil {
109117
c.setError(err)
110-
return nil
118+
return maybe.Nothing[[]byte](), err
111119
}
112120
largestHandledKey = nextKey
113121
}
114122

115-
onFinish(largestHandledKey)
116-
return nil
123+
return largestHandledKey, nil
117124
}
118125

119126
func (c *xSyncClient) HandleChangeProofResponse(
120127
ctx context.Context,
121128
request *pb.SyncGetChangeProofRequest,
122129
responseBytes []byte,
123-
onFinish func(maybe.Maybe[[]byte]),
124-
) error {
130+
) (maybe.Maybe[[]byte], error) {
125131
var changeProofResp pb.SyncGetChangeProofResponse
126132
if err := proto.Unmarshal(responseBytes, &changeProofResp); err != nil {
127-
return err
133+
return maybe.Nothing[[]byte](), err
128134
}
129135

130136
startKey := maybeBytesToMaybe(request.StartKey)
@@ -139,21 +145,21 @@ func (c *xSyncClient) HandleChangeProofResponse(
139145
// The server had enough history to send us a change proof
140146
var changeProof merkledb.ChangeProof
141147
if err := changeProof.UnmarshalProto(changeProofResp.ChangeProof); err != nil {
142-
return err
148+
return maybe.Nothing[[]byte](), err
143149
}
144150

145151
// Ensure the response does not contain more than the requested number of leaves
146152
// and the start and end roots match the requested roots.
147153
if len(changeProof.KeyChanges) > int(request.KeyLimit) {
148-
return fmt.Errorf(
154+
return maybe.Nothing[[]byte](), fmt.Errorf(
149155
"%w: (%d) > %d)",
150156
errTooManyKeys, len(changeProof.KeyChanges), request.KeyLimit,
151157
)
152158
}
153159

154160
endRoot, err := ids.ToID(request.EndRootHash)
155161
if err != nil {
156-
return err
162+
return maybe.Nothing[[]byte](), err
157163
}
158164

159165
if err := c.db.VerifyChangeProof(
@@ -163,15 +169,15 @@ func (c *xSyncClient) HandleChangeProofResponse(
163169
endKey,
164170
endRoot,
165171
); err != nil {
166-
return fmt.Errorf("%w due to %w", errInvalidChangeProof, err)
172+
return maybe.Nothing[[]byte](), fmt.Errorf("%w due to %w", errInvalidChangeProof, err)
167173
}
168174

169175
largestHandledKey = endKey
170176
// if the proof wasn't empty, apply changes to the sync DB
171177
if len(changeProof.KeyChanges) > 0 {
172178
if err := c.db.CommitChangeProof(ctx, &changeProof); err != nil {
173179
c.setError(err)
174-
return nil
180+
return maybe.Nothing[[]byte](), err
175181
}
176182
largestHandledKey = maybe.Some(changeProof.KeyChanges[len(changeProof.KeyChanges)-1].Key)
177183
}
@@ -180,7 +186,7 @@ func (c *xSyncClient) HandleChangeProofResponse(
180186
case *pb.SyncGetChangeProofResponse_RangeProof:
181187
var rangeProof merkledb.RangeProof
182188
if err := rangeProof.UnmarshalProto(changeProofResp.RangeProof); err != nil {
183-
return err
189+
return maybe.Nothing[[]byte](), err
184190
}
185191

186192
// The server did not have enough history to send us a change proof
@@ -195,22 +201,22 @@ func (c *xSyncClient) HandleChangeProofResponse(
195201
c.tokenSize,
196202
c.config.Hasher,
197203
); err != nil {
198-
return err
204+
return maybe.Nothing[[]byte](), err
199205
}
200206

201207
largestHandledKey = endKey
202208
if len(rangeProof.KeyChanges) > 0 {
203209
// Add all the key-value pairs we got to the database.
204210
if err := c.db.CommitRangeProof(ctx, startKey, endKey, &rangeProof); err != nil {
205211
c.setError(err)
206-
return nil
212+
return maybe.Nothing[[]byte](), err
207213
}
208214
largestHandledKey = maybe.Some(rangeProof.KeyChanges[len(rangeProof.KeyChanges)-1].Key)
209215
}
210216
endProof = rangeProof.EndProof
211217

212218
default:
213-
return fmt.Errorf(
219+
return maybe.Nothing[[]byte](), fmt.Errorf(
214220
"%w: %T",
215221
errUnexpectedChangeProofResponse, changeProofResp,
216222
)
@@ -221,12 +227,12 @@ func (c *xSyncClient) HandleChangeProofResponse(
221227
nextKey, err := c.findNextKey(ctx, largestHandledKey.Value(), endKey, endProof)
222228
if err != nil {
223229
c.setError(err)
224-
return nil
230+
return maybe.Nothing[[]byte](), err
225231
}
226232
largestHandledKey = nextKey
227233
}
228-
onFinish(largestHandledKey)
229-
return nil
234+
235+
return largestHandledKey, nil
230236
}
231237

232238
// findNextKey returns the start of the key range that should be fetched next

0 commit comments

Comments
 (0)