Skip to content

Commit 7f9bdab

Browse files
committed
Backup Geyser memory worker now pulls addresses from DB
1 parent 666d168 commit 7f9bdab

File tree

7 files changed

+116
-21
lines changed

7 files changed

+116
-21
lines changed

data/ram/memory/memory.go

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,14 +56,37 @@ func (s *store) Save(_ context.Context, data *ram.Record) error {
5656
return nil
5757
}
5858

59+
// GetAllMemoryAccounts implements ram.Store.GetAllMemoryAccounts
60+
func (s *store) GetAllMemoryAccounts(ctx context.Context, vm string) ([]string, error) {
61+
s.mu.Lock()
62+
defer s.mu.Unlock()
63+
64+
items := s.findByVm(vm)
65+
if len(items) == 0 {
66+
return nil, ram.ErrAccountNotFound
67+
}
68+
69+
uniqueMemoryAccounts := make(map[string]any)
70+
for _, item := range items {
71+
uniqueMemoryAccounts[item.MemoryAccount] = struct{}{}
72+
}
73+
74+
var res []string
75+
for memoryAccount := range uniqueMemoryAccounts {
76+
res = append(res, memoryAccount)
77+
}
78+
79+
return res, nil
80+
}
81+
5982
// GetAllByMemoryAccount implements ram.Store.GetAllByMemoryAccount
6083
func (s *store) GetAllByMemoryAccount(_ context.Context, memoryAccount string) ([]*ram.Record, error) {
6184
s.mu.Lock()
6285
defer s.mu.Unlock()
6386

6487
items := s.findByMemoryAccount(memoryAccount)
6588
if len(items) == 0 {
66-
return nil, ram.ErrNotFound
89+
return nil, ram.ErrItemNotFound
6790
}
6891
return cloneRecords(items), nil
6992
}
@@ -75,7 +98,7 @@ func (s *store) GetAllVirtualAccountsByAddressAndType(_ context.Context, address
7598

7699
items := s.findByAddressAndAccountType(address, accountType)
77100
if len(items) == 0 {
78-
return nil, ram.ErrNotFound
101+
return nil, ram.ErrItemNotFound
79102
}
80103
return cloneRecords(items), nil
81104
}
@@ -103,6 +126,16 @@ func (s *store) findByMemoryAccount(memoryAccount string) []*ram.Record {
103126
return res
104127
}
105128

129+
func (s *store) findByVm(vm string) []*ram.Record {
130+
var res []*ram.Record
131+
for _, item := range s.records {
132+
if item.Vm == vm {
133+
res = append(res, item)
134+
}
135+
}
136+
return res
137+
}
138+
106139
func (s *store) findByAddressAndAccountType(address string, accountType cvm.VirtualAccountType) []*ram.Record {
107140
var res []*ram.Record
108141
for _, item := range s.records {

data/ram/postgres/model.go

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,26 @@ func (m *model) dbSave(ctx context.Context, tableName string, db *sqlx.DB) error
133133
return pgutil.CheckNoRows(err, ram.ErrStaleState)
134134
}
135135

136+
func dbGetAllMemoryAccounts(ctx context.Context, tableName string, db *sqlx.DB, vm string) ([]string, error) {
137+
res := []string{}
138+
139+
query := `SELECT DISTINCT(memory_account) FROM ` + tableName + `
140+
WHERE vm = $1`
141+
142+
err := db.SelectContext(
143+
ctx,
144+
&res,
145+
query,
146+
vm,
147+
)
148+
if err != nil {
149+
return nil, pgutil.CheckNoRows(err, ram.ErrAccountNotFound)
150+
} else if len(res) == 0 {
151+
return nil, ram.ErrAccountNotFound
152+
}
153+
return res, nil
154+
}
155+
136156
func dbGetAllByMemoryAccount(ctx context.Context, tableName string, db *sqlx.DB, memoryAccount string) ([]*model, error) {
137157
res := []*model{}
138158

@@ -146,9 +166,9 @@ func dbGetAllByMemoryAccount(ctx context.Context, tableName string, db *sqlx.DB,
146166
memoryAccount,
147167
)
148168
if err != nil {
149-
return nil, pgutil.CheckNoRows(err, ram.ErrNotFound)
169+
return nil, pgutil.CheckNoRows(err, ram.ErrItemNotFound)
150170
} else if len(res) == 0 {
151-
return nil, ram.ErrNotFound
171+
return nil, ram.ErrItemNotFound
152172
}
153173
return res, nil
154174
}
@@ -167,9 +187,9 @@ func dbGetAllVirtualAccountsByAddressAndType(ctx context.Context, tableName stri
167187
accountType,
168188
)
169189
if err != nil {
170-
return nil, pgutil.CheckNoRows(err, ram.ErrNotFound)
190+
return nil, pgutil.CheckNoRows(err, ram.ErrItemNotFound)
171191
} else if len(res) == 0 {
172-
return nil, ram.ErrNotFound
192+
return nil, ram.ErrItemNotFound
173193
}
174194
return res, nil
175195
}

data/ram/postgres/store.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,11 @@ func (s *store) Save(ctx context.Context, record *ram.Record) error {
4040
return nil
4141
}
4242

43+
// GetAllMemoryAccounts implements ram.Store.GetAllMemoryAccounts
44+
func (s *store) GetAllMemoryAccounts(ctx context.Context, vm string) ([]string, error) {
45+
return dbGetAllMemoryAccounts(ctx, s.tableName, s.db, vm)
46+
}
47+
4348
// GetAllByMemoryAccount implements ram.Store.GetAllByMemoryAccount
4449
func (s *store) GetAllByMemoryAccount(ctx context.Context, memoryAccount string) ([]*ram.Record, error) {
4550
models, err := dbGetAllByMemoryAccount(ctx, s.tableName, s.db, memoryAccount)

data/ram/store.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,18 @@ import (
88
)
99

1010
var (
11-
ErrStaleState = errors.New("memory item state is stale")
12-
ErrNotFound = errors.New("memory item not found")
11+
ErrAccountNotFound = errors.New("memory account not found")
12+
ErrItemNotFound = errors.New("memory item not found")
13+
ErrStaleState = errors.New("memory item state is stale")
1314
)
1415

1516
type Store interface {
1617
// Save updates the database record for a piece of allocated memory
1718
Save(ctx context.Context, record *Record) error
1819

20+
// GetAllMemoryAccounts gets all unique memory account addresses
21+
GetAllMemoryAccounts(ctx context.Context, vm string) ([]string, error)
22+
1923
// GetAllByMemoryAccount gets all database records for a given memory account
2024
GetAllByMemoryAccount(ctx context.Context, memoryAccount string) ([]*Record, error)
2125

data/ram/tests/tests.go

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
func RunTests(t *testing.T, s ram.Store, teardown func()) {
1717
for _, tf := range []func(t *testing.T, s ram.Store){
1818
testRoundTrip,
19+
testGetAllMemoryAccounts,
1920
testGetAllByMemoryAccount,
2021
testGetAllVirtualAccountsByAddressAndType,
2122
} {
@@ -33,10 +34,10 @@ func testRoundTrip(t *testing.T, s ram.Store) {
3334
accountType := cvm.VirtualAccountTypeTimelock
3435

3536
_, err := s.GetAllVirtualAccountsByAddressAndType(ctx, address, accountType)
36-
assert.Equal(t, ram.ErrNotFound, err)
37+
assert.Equal(t, ram.ErrItemNotFound, err)
3738

3839
_, err = s.GetAllByMemoryAccount(ctx, memoryAccount)
39-
assert.Equal(t, ram.ErrNotFound, err)
40+
assert.Equal(t, ram.ErrItemNotFound, err)
4041

4142
start := time.Now()
4243

@@ -91,7 +92,7 @@ func testRoundTrip(t *testing.T, s ram.Store) {
9192
require.NoError(t, s.Save(ctx, expected))
9293

9394
_, err = s.GetAllVirtualAccountsByAddressAndType(ctx, address, accountType)
94-
assert.Equal(t, ram.ErrNotFound, err)
95+
assert.Equal(t, ram.ErrItemNotFound, err)
9596

9697
actual, err = s.GetAllByMemoryAccount(ctx, memoryAccount)
9798
require.NoError(t, err)
@@ -100,6 +101,32 @@ func testRoundTrip(t *testing.T, s ram.Store) {
100101
})
101102
}
102103

104+
func testGetAllMemoryAccounts(t *testing.T, s ram.Store) {
105+
t.Run("testGetAllMemoryAccounts", func(t *testing.T) {
106+
ctx := context.Background()
107+
108+
_, err := s.GetAllMemoryAccounts(ctx, "vm2")
109+
assert.Equal(t, ram.ErrAccountNotFound, err)
110+
111+
for i := 0; i < 3; i++ {
112+
require.NoError(t, s.Save(ctx, &ram.Record{
113+
Vm: fmt.Sprintf("vm%d", i),
114+
115+
MemoryAccount: fmt.Sprintf("memory_account_%d", i),
116+
Index: 12345,
117+
IsAllocated: false,
118+
119+
Slot: 12345,
120+
}))
121+
}
122+
123+
actual, err := s.GetAllMemoryAccounts(ctx, "vm2")
124+
require.NoError(t, err)
125+
require.Len(t, actual, 1)
126+
assert.Equal(t, "memory_account_2", actual[0])
127+
})
128+
}
129+
103130
func testGetAllByMemoryAccount(t *testing.T, s ram.Store) {
104131
t.Run("testGetAllByMemoryAccount", func(t *testing.T) {
105132
ctx := context.Background()

geyser/handler_memory.go

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -103,13 +103,19 @@ func (h *MemoryAccountWithDataUpdateHandler) backupWorker(ctx context.Context) e
103103
for {
104104
select {
105105
case <-time.After(h.backupWorkerInterval):
106-
addresses := make([]string, 0)
107-
108-
h.cachedMemoryAccountStateMu.RLock()
109-
for address := range h.cachedMemoryAccountState {
110-
addresses = append(addresses, address)
106+
var addresses []string
107+
for vm := range h.observableVmAccounts {
108+
log := log.WithField("vm", vm)
109+
110+
addressesByVm, err := h.ramStore.GetAllMemoryAccounts(ctx, vm)
111+
switch err {
112+
case nil:
113+
addresses = append(addresses, addressesByVm...)
114+
case ram.ErrAccountNotFound:
115+
default:
116+
log.WithError(err).Warn("failure getting memory account addresses by vm")
117+
}
111118
}
112-
h.cachedMemoryAccountStateMu.RUnlock()
113119

114120
for _, address := range addresses {
115121
log := log.WithField("address", address)
@@ -228,7 +234,7 @@ func (h *MemoryAccountWithDataUpdateHandler) onStateObserved(ctx context.Context
228234

229235
cachedState[cachedVirtualAccountState.Index] = cachedVirtualAccountState
230236
}
231-
case ram.ErrNotFound:
237+
case ram.ErrItemNotFound:
232238
default:
233239
log.WithError(err).Warn("failure loading memory account state from db")
234240
return err

rpc/server.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ func (s *server) GetVirtualTimelockAccounts(ctx context.Context, req *indexerpb.
4040
})
4141

4242
records, err := s.ramStore.GetAllVirtualAccountsByAddressAndType(ctx, base58.Encode(req.Owner.Value), cvm.VirtualAccountTypeTimelock)
43-
if err == ram.ErrNotFound {
43+
if err == ram.ErrItemNotFound {
4444
return &indexerpb.GetVirtualTimelockAccountsResponse{
4545
Result: indexerpb.GetVirtualTimelockAccountsResponse_NOT_FOUND,
4646
}, nil
@@ -101,7 +101,7 @@ func (s *server) GetVirtualDurableNonce(ctx context.Context, req *indexerpb.GetV
101101
})
102102

103103
records, err := s.ramStore.GetAllVirtualAccountsByAddressAndType(ctx, base58.Encode(req.Address.Value), cvm.VirtualAccountTypeDurableNonce)
104-
if err == ram.ErrNotFound {
104+
if err == ram.ErrItemNotFound {
105105
return &indexerpb.GetVirtualDurableNonceResponse{
106106
Result: indexerpb.GetVirtualDurableNonceResponse_NOT_FOUND,
107107
}, nil
@@ -157,7 +157,7 @@ func (s *server) GetVirtualRelayAccount(ctx context.Context, req *indexerpb.GetV
157157
})
158158

159159
records, err := s.ramStore.GetAllVirtualAccountsByAddressAndType(ctx, base58.Encode(req.Address.Value), cvm.VirtualAccountTypeRelay)
160-
if err == ram.ErrNotFound {
160+
if err == ram.ErrItemNotFound {
161161
return &indexerpb.GetVirtualRelayAccountResponse{
162162
Result: indexerpb.GetVirtualRelayAccountResponse_NOT_FOUND,
163163
}, nil

0 commit comments

Comments
 (0)