Skip to content

Commit fbe238f

Browse files
committed
Parallelize Geyser DB updates
1 parent 202406d commit fbe238f

File tree

1 file changed

+30
-22
lines changed

1 file changed

+30
-22
lines changed

geyser/handler_memory.go

Lines changed: 30 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -292,35 +292,43 @@ func (h *MemoryAccountWithDataUpdateHandler) onStateObserved(ctx context.Context
292292
}
293293

294294
// Update the DB with the delta changes to the memory account state
295+
var wg sync.WaitGroup
295296
for _, dbUpdate := range dbUpdates {
296-
record := &ram.Record{
297-
Vm: base58VmAddress,
297+
wg.Add(1)
298298

299-
MemoryAccount: base58MemoryAccountAddress,
300-
Index: uint16(dbUpdate.Index),
301-
IsAllocated: dbUpdate.IsInitialized,
299+
go func(dbUpdate *cachedVirtualAccount) {
300+
defer wg.Done()
302301

303-
Slot: dbUpdate.Slot,
302+
record := &ram.Record{
303+
Vm: base58VmAddress,
304304

305-
LastUpdatedAt: time.Now(),
306-
}
305+
MemoryAccount: base58MemoryAccountAddress,
306+
Index: uint16(dbUpdate.Index),
307+
IsAllocated: dbUpdate.IsInitialized,
307308

308-
if dbUpdate.IsInitialized {
309-
record.Address = &dbUpdate.Address
310-
record.Type = &dbUpdate.Type
311-
record.Data = dbUpdate.State
312-
}
309+
Slot: dbUpdate.Slot,
313310

314-
err := h.ramStore.Save(ctx, record)
315-
switch err {
316-
case nil:
317-
h.cachedMemoryAccountState[base58MemoryAccountAddress][dbUpdate.Index] = dbUpdate
318-
case ram.ErrStaleState:
319-
// Should never happen given current locking structure
320-
default:
321-
log.WithError(err).Warn("failure updating db record")
322-
}
311+
LastUpdatedAt: time.Now(),
312+
}
313+
314+
if dbUpdate.IsInitialized {
315+
record.Address = &dbUpdate.Address
316+
record.Type = &dbUpdate.Type
317+
record.Data = dbUpdate.State
318+
}
319+
320+
err := h.ramStore.Save(ctx, record)
321+
switch err {
322+
case nil:
323+
h.cachedMemoryAccountState[base58MemoryAccountAddress][dbUpdate.Index] = dbUpdate
324+
case ram.ErrStaleState:
325+
// Should never happen given current locking structure
326+
default:
327+
log.WithError(err).Warn("failure updating db record")
328+
}
329+
}(dbUpdate)
323330
}
331+
wg.Wait()
324332

325333
h.lastSuccessfulSlotUpdateMu.Lock()
326334
lastSuccessfulSlotUpdate = h.lastSuccessfulSlotUpdate[base58MemoryAccountAddress]

0 commit comments

Comments
 (0)