Skip to content

Commit 3fe23d4

Browse files
authored
Implement Geyser external deposit backup worker (#194)
1 parent 1460446 commit 3fe23d4

File tree

15 files changed

+761
-749
lines changed

15 files changed

+761
-749
lines changed

pkg/code/async/geyser/backup.go

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import (
77

88
"github.com/newrelic/go-agent/v3/newrelic"
99

10+
"github.com/code-payments/code-server/pkg/code/common"
11+
"github.com/code-payments/code-server/pkg/code/data/account"
1012
"github.com/code-payments/code-server/pkg/code/data/timelock"
1113
"github.com/code-payments/code-server/pkg/database/query"
1214
"github.com/code-payments/code-server/pkg/metrics"
@@ -123,9 +125,39 @@ func (p *service) backupExternalDepositWorker(serviceCtx context.Context, interv
123125
nr := serviceCtx.Value(metrics.NewRelicContextKey).(*newrelic.Application)
124126
m := nr.StartTransaction("async__geyser_consumer_service__backup_external_deposit_worker")
125127
defer m.End()
126-
// tracedCtx := newrelic.NewContext(serviceCtx, m)
128+
tracedCtx := newrelic.NewContext(serviceCtx, m)
129+
130+
accountInfoRecords, err := p.data.GetPrioritizedAccountInfosRequiringDepositSync(tracedCtx, 256)
131+
if err == account.ErrAccountInfoNotFound {
132+
return
133+
} else if err != nil {
134+
log.WithError(err).Warn("failed to get account info records")
135+
return
136+
}
137+
138+
var wg sync.WaitGroup
139+
for _, accountInfoRecord := range accountInfoRecords {
140+
wg.Add(1)
141+
142+
go func(accountInfoRecord *account.Record) {
143+
defer wg.Done()
144+
145+
authorityAccount, err := common.NewAccountFromPublicKeyString(accountInfoRecord.AuthorityAccount)
146+
if err != nil {
147+
log.WithError(err).Warn("invalid authority account")
148+
return
149+
}
150+
151+
log := log.WithField("authority", authorityAccount.PublicKey().ToBase58())
127152

128-
// todo: implement me
153+
err = fixMissingExternalDeposits(tracedCtx, p.data, p.vmIndexerClient, authorityAccount)
154+
if err != nil {
155+
log.WithError(err).Warn("failed to fix missing external deposits")
156+
}
157+
}(accountInfoRecord)
158+
}
159+
160+
wg.Wait()
129161
}()
130162
case <-serviceCtx.Done():
131163
return serviceCtx.Err()

pkg/code/async/geyser/config.go

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,20 +22,11 @@ const (
2222
BackupTimelockWorkerIntervalConfigEnvName = envConfigPrefix + "BACKUP_TIMELOCK_WORKER_INTERVAL"
2323
defaultBackupTimelockWorkerInterval = 1 * time.Minute
2424

25-
BackupExternalDepositWorkerCountConfigEnvName = envConfigPrefix + "BACKUP_EXTERNAL_DEPOSIT_WORKER_COUNT"
26-
defaultBackupExternalDepositWorkerCount = 32
27-
2825
BackupExternalDepositWorkerIntervalConfigEnvName = envConfigPrefix + "BACKUP_EXTERNAL_DEPOSIT_WORKER_INTERVAL"
2926
defaultBackupExternalDepositWorkerInterval = 15 * time.Second
3027

31-
MessagingFeeCollectorPublicKeyConfigEnvName = envConfigPrefix + "MESSAGING_FEE_COLLECTOR_PUBLIC_KEY"
32-
defaultMessagingFeeCollectorPublicKey = "invalid" // ensure something valid is set
33-
3428
SwapSubsidizerPublicKeyConfigEnvName = envConfigPrefix + "SWAP_SUBSIDIZER_PUBLIC_KEY"
3529
defaultSwapSubsidizerPublicKey = "invalid" // ensure something valid is set
36-
37-
BackupMessagingWorkerIntervalConfigEnvName = envConfigPrefix + "BACKUP_MESSAGING_WORKER_INTERVAL"
38-
defaultBackupMessagingWorkerInterval = 15 * time.Minute // Decrease significantly once feature is live
3930
)
4031

4132
type conf struct {
@@ -44,14 +35,10 @@ type conf struct {
4435
programUpdateWorkerCount config.Uint64
4536
programUpdateQueueSize config.Uint64
4637

47-
backupExternalDepositWorkerCount config.Uint64
4838
backupExternalDepositWorkerInterval config.Duration
4939

5040
backupTimelockWorkerInterval config.Duration
5141

52-
messagingFeeCollectorPublicKey config.String
53-
backupMessagingWorkerInterval config.Duration
54-
5542
swapSubsidizerPublicKey config.String
5643
}
5744

@@ -67,14 +54,10 @@ func WithEnvConfigs() ConfigProvider {
6754
programUpdateWorkerCount: env.NewUint64Config(ProgramUpdateWorkerCountConfigEnvName, defaultProgramUpdateWorkerCount),
6855
programUpdateQueueSize: env.NewUint64Config(ProgramUpdateQueueSizeConfigEnvName, defaultProgramUpdateQueueSize),
6956

70-
backupExternalDepositWorkerCount: env.NewUint64Config(BackupExternalDepositWorkerCountConfigEnvName, defaultBackupExternalDepositWorkerCount),
7157
backupExternalDepositWorkerInterval: env.NewDurationConfig(BackupExternalDepositWorkerIntervalConfigEnvName, defaultBackupExternalDepositWorkerInterval),
7258

7359
backupTimelockWorkerInterval: env.NewDurationConfig(BackupTimelockWorkerIntervalConfigEnvName, defaultBackupTimelockWorkerInterval),
7460

75-
messagingFeeCollectorPublicKey: env.NewStringConfig(MessagingFeeCollectorPublicKeyConfigEnvName, defaultMessagingFeeCollectorPublicKey),
76-
backupMessagingWorkerInterval: env.NewDurationConfig(BackupMessagingWorkerIntervalConfigEnvName, defaultBackupMessagingWorkerInterval),
77-
7861
swapSubsidizerPublicKey: env.NewStringConfig(SwapSubsidizerPublicKeyConfigEnvName, defaultSwapSubsidizerPublicKey),
7962
}
8063
}

0 commit comments

Comments
 (0)