Skip to content

Commit 8c96823

Browse files
committed
Add Geyser app integration for notifications
1 parent 607d456 commit 8c96823

File tree

5 files changed

+36
-11
lines changed

5 files changed

+36
-11
lines changed

pkg/code/async/geyser/backup.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ func (p *service) backupExternalDepositWorker(serviceCtx context.Context, interv
150150

151151
log := log.WithField("authority", authorityAccount.PublicKey().ToBase58())
152152

153-
err = fixMissingExternalDeposits(tracedCtx, p.data, p.vmIndexerClient, authorityAccount)
153+
err = fixMissingExternalDeposits(tracedCtx, p.data, p.vmIndexerClient, p.integration, authorityAccount)
154154
if err != nil {
155155
log.WithError(err).Warn("failed to fix missing external deposits")
156156
}

pkg/code/async/geyser/external_deposit.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ var (
3838
syncedDepositCache = cache.NewCache(1_000_000)
3939
)
4040

41-
func fixMissingExternalDeposits(ctx context.Context, data code_data.Provider, vmIndexerClient indexerpb.IndexerClient, userAuthority *common.Account) error {
41+
func fixMissingExternalDeposits(ctx context.Context, data code_data.Provider, vmIndexerClient indexerpb.IndexerClient, integration Integration, userAuthority *common.Account) error {
4242
err := maybeInitiateExternalDepositIntoVm(ctx, data, vmIndexerClient, userAuthority)
4343
if err != nil {
4444
return errors.Wrap(err, "error depositing into the vm")
@@ -51,7 +51,7 @@ func fixMissingExternalDeposits(ctx context.Context, data code_data.Provider, vm
5151

5252
var anyError error
5353
for _, signature := range signatures {
54-
err := processPotentialExternalDepositIntoVm(ctx, data, signature, userAuthority)
54+
err := processPotentialExternalDepositIntoVm(ctx, data, integration, signature, userAuthority)
5555
if err != nil {
5656
anyError = errors.Wrap(err, "error processing signature for external deposit into vm")
5757
}
@@ -202,7 +202,7 @@ func findPotentialExternalDepositsIntoVm(ctx context.Context, data code_data.Pro
202202
}
203203
}
204204

205-
func processPotentialExternalDepositIntoVm(ctx context.Context, data code_data.Provider, signature string, userAuthority *common.Account) error {
205+
func processPotentialExternalDepositIntoVm(ctx context.Context, data code_data.Provider, integration Integration, signature string, userAuthority *common.Account) error {
206206
vmDepositAta, err := userAuthority.ToVmDepositAssociatedTokenAccount(common.CodeVmAccount, common.CoreMintAccount)
207207
if err != nil {
208208
return errors.Wrap(err, "error getting vm deposit ata")
@@ -276,6 +276,11 @@ func processPotentialExternalDepositIntoVm(ctx context.Context, data code_data.P
276276
return nil
277277
}
278278

279+
ownerAccount, err := common.NewAccountFromPublicKeyString(accountInfoRecord.OwnerAccount)
280+
if err != nil {
281+
return errors.Wrap(err, "invalid owner account")
282+
}
283+
279284
usdExchangeRecord, err := data.GetExchangeRate(ctx, currency_lib.USD, time.Now())
280285
if err != nil {
281286
return errors.Wrap(err, "error getting usd rate")
@@ -287,7 +292,7 @@ func processPotentialExternalDepositIntoVm(ctx context.Context, data code_data.P
287292
IntentId: getExternalDepositIntentID(signature, userVirtualTimelockVaultAccount),
288293
IntentType: intent.ExternalDeposit,
289294

290-
InitiatorOwnerAccount: accountInfoRecord.OwnerAccount,
295+
InitiatorOwnerAccount: ownerAccount.PublicKey().ToBase58(),
291296

292297
ExternalDepositMetadata: &intent.ExternalDepositMetadata{
293298
DestinationTokenAccount: userVirtualTimelockVaultAccount.PublicKey().ToBase58(),
@@ -322,6 +327,9 @@ func processPotentialExternalDepositIntoVm(ctx context.Context, data code_data.P
322327

323328
syncedDepositCache.Insert(cacheKey, true, 1)
324329

330+
// Best-effort processing for notification back to the user
331+
integration.OnDepositReceived(ctx, ownerAccount, uint64(deltaQuarksIntoOmnibus))
332+
325333
return nil
326334
default:
327335
syncedDepositCache.Insert(cacheKey, true, 1)

pkg/code/async/geyser/handler.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,15 @@ type TokenProgramAccountHandler struct {
3131
conf *conf
3232
data code_data.Provider
3333
vmIndexerClient indexerpb.IndexerClient
34+
integration Integration
3435
}
3536

36-
func NewTokenProgramAccountHandler(conf *conf, data code_data.Provider, vmIndexerClient indexerpb.IndexerClient) ProgramAccountUpdateHandler {
37+
func NewTokenProgramAccountHandler(conf *conf, data code_data.Provider, vmIndexerClient indexerpb.IndexerClient, integration Integration) ProgramAccountUpdateHandler {
3738
return &TokenProgramAccountHandler{
3839
conf: conf,
3940
data: data,
4041
vmIndexerClient: vmIndexerClient,
42+
integration: integration,
4143
}
4244
}
4345

@@ -97,7 +99,7 @@ func (h *TokenProgramAccountHandler) Handle(ctx context.Context, update *geyserp
9799
return nil
98100
}
99101

100-
err = processPotentialExternalDepositIntoVm(ctx, h.data, signature, userAuthorityAccount)
102+
err = processPotentialExternalDepositIntoVm(ctx, h.data, h.integration, signature, userAuthorityAccount)
101103
if err != nil {
102104
return errors.Wrap(err, "error processing signature for external deposit into vm")
103105
}
@@ -116,8 +118,8 @@ func (h *TokenProgramAccountHandler) Handle(ctx context.Context, update *geyserp
116118
}
117119
}
118120

119-
func initializeProgramAccountUpdateHandlers(conf *conf, data code_data.Provider, vmIndexerClient indexerpb.IndexerClient) map[string]ProgramAccountUpdateHandler {
121+
func initializeProgramAccountUpdateHandlers(conf *conf, data code_data.Provider, vmIndexerClient indexerpb.IndexerClient, integration Integration) map[string]ProgramAccountUpdateHandler {
120122
return map[string]ProgramAccountUpdateHandler{
121-
base58.Encode(token.ProgramKey): NewTokenProgramAccountHandler(conf, data, vmIndexerClient),
123+
base58.Encode(token.ProgramKey): NewTokenProgramAccountHandler(conf, data, vmIndexerClient, integration),
122124
}
123125
}

pkg/code/async/geyser/integration.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package async_geyser
2+
3+
import (
4+
"context"
5+
6+
"github.com/code-payments/code-server/pkg/code/common"
7+
)
8+
9+
// Integration allows for notifications based on events processed by Geyser
10+
type Integration interface {
11+
OnDepositReceived(ctx context.Context, owner *common.Account, quarksReceived uint64) error
12+
}

pkg/code/async/geyser/service.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ type service struct {
2727
vmIndexerClient indexerpb.IndexerClient
2828
conf *conf
2929

30+
integration Integration
31+
3032
programUpdatesChan chan *geyserpb.SubscribeUpdateAccount
3133
programUpdateHandlers map[string]ProgramAccountUpdateHandler
3234

@@ -44,15 +46,16 @@ type service struct {
4446
backupExternalDepositWorkerStatus bool
4547
}
4648

47-
func New(data code_data.Provider, vmIndexerClient indexerpb.IndexerClient, configProvider ConfigProvider) async.Service {
49+
func New(data code_data.Provider, vmIndexerClient indexerpb.IndexerClient, integration Integration, configProvider ConfigProvider) async.Service {
4850
conf := configProvider()
4951
return &service{
5052
log: logrus.StandardLogger().WithField("service", "geyser_consumer"),
5153
data: data,
5254
vmIndexerClient: vmIndexerClient,
5355
conf: configProvider(),
56+
integration: integration,
5457
programUpdatesChan: make(chan *geyserpb.SubscribeUpdateAccount, conf.programUpdateQueueSize.Get(context.Background())),
55-
programUpdateHandlers: initializeProgramAccountUpdateHandlers(conf, data, vmIndexerClient),
58+
programUpdateHandlers: initializeProgramAccountUpdateHandlers(conf, data, vmIndexerClient, integration),
5659
programUpdateWorkerMetrics: make(map[int]*eventWorkerMetrics),
5760
}
5861
}

0 commit comments

Comments
 (0)