Skip to content

Commit 8b727bf

Browse files
committed
Improve Geyser worker's ability to throw away stale memory account updates
1 parent fbe238f commit 8b727bf

File tree

1 file changed

+33
-0
lines changed

1 file changed

+33
-0
lines changed

geyser/handler_memory.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ type MemoryAccountWithDataUpdateHandler struct {
4242
lastSuccessfulSlotUpdateMu sync.RWMutex
4343
lastSuccessfulSlotUpdate map[string]uint64
4444

45+
highestQueuedSlotUpdateMu sync.RWMutex
46+
highestQueuedSlotUpdate map[string]uint64
47+
4548
backupWorkerInterval time.Duration
4649
}
4750

@@ -60,6 +63,7 @@ func NewMemoryAccountWithDataUpdateHandler(solanaClient solana.Client, ramStore
6063
observableVmAccounts: observableVmAccounts,
6164
cachedMemoryAccountState: make(map[string]map[int]*cachedVirtualAccount),
6265
lastSuccessfulSlotUpdate: make(map[string]uint64),
66+
highestQueuedSlotUpdate: make(map[string]uint64),
6367
backupWorkerInterval: backupWorkerInterval,
6468
}
6569
}
@@ -168,9 +172,38 @@ func (h *MemoryAccountWithDataUpdateHandler) onStateObserved(ctx context.Context
168172
}
169173
h.lastSuccessfulSlotUpdateMu.RUnlock()
170174

175+
// Check if there's an update for a higher slot
176+
h.highestQueuedSlotUpdateMu.Lock()
177+
highestQueuedSlotUpdate := h.highestQueuedSlotUpdate[base58MemoryAccountAddress]
178+
if observedAtSlot <= highestQueuedSlotUpdate {
179+
h.highestQueuedSlotUpdateMu.Unlock()
180+
return nil
181+
}
182+
h.highestQueuedSlotUpdate[base58MemoryAccountAddress] = observedAtSlot
183+
h.highestQueuedSlotUpdateMu.Unlock()
184+
171185
h.cachedMemoryAccountStateMu.Lock()
172186
defer h.cachedMemoryAccountStateMu.Unlock()
173187

188+
// Check (again after acquiring cached memory state mutex) if the state
189+
// is stale relative to the last successful update
190+
h.lastSuccessfulSlotUpdateMu.RLock()
191+
lastSuccessfulSlotUpdate = h.lastSuccessfulSlotUpdate[base58MemoryAccountAddress]
192+
if observedAtSlot <= lastSuccessfulSlotUpdate {
193+
h.lastSuccessfulSlotUpdateMu.RUnlock()
194+
return nil
195+
}
196+
h.lastSuccessfulSlotUpdateMu.RUnlock()
197+
198+
// Check (again after acquiring cached memory state mutex) if there's an
199+
// update queued for a higher slot
200+
h.highestQueuedSlotUpdateMu.RLock()
201+
highestQueuedSlotUpdate = h.highestQueuedSlotUpdate[base58MemoryAccountAddress]
202+
if observedAtSlot < highestQueuedSlotUpdate {
203+
return nil
204+
}
205+
h.highestQueuedSlotUpdateMu.RUnlock()
206+
174207
cachedState, ok := h.cachedMemoryAccountState[base58MemoryAccountAddress]
175208
if !ok {
176209
// Load entire memory account state from the DB into the cache

0 commit comments

Comments
 (0)