Skip to content

Commit dea5273

Browse files
committed
Use the Yellowstone Geyser plugin
1 parent 819061c commit dea5273

13 files changed

+3979
-6553
lines changed

generated/geyser/v1/confirmed_block.pb.validate.go

Lines changed: 0 additions & 2415 deletions
This file was deleted.

generated/geyser/v1/geyser.pb.go

Lines changed: 2915 additions & 809 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

generated/geyser/v1/geyser.pb.validate.go

Lines changed: 0 additions & 2450 deletions
This file was deleted.

generated/geyser/v1/geyser_grpc.pb.go

Lines changed: 188 additions & 342 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

generated/geyser/v1/confirmed_block.pb.go renamed to generated/geyser/v1/solana-storage.pb.go

Lines changed: 553 additions & 362 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

geyser/config.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ const (
1313
GrpcPluginEndointConfigEnvName = envConfigPrefix + "GRPC_PLUGIN_ENDPOINT"
1414
defaultGrpcPluginEndoint = ""
1515

16+
GrpcPluginXTokenConfigEnvName = envConfigPrefix + "GRPC_PLUGIN_X_TOKEN"
17+
defaultGrpcPluginXToken = ""
18+
1619
ProgramUpdateWorkerCountConfigEnvName = envConfigPrefix + "PROGRAM_UPDATE_WORKER_COUNT"
1720
defaultProgramUpdateWorkerCount = 1024
1821

@@ -28,6 +31,7 @@ const (
2831

2932
type conf struct {
3033
grpcPluginEndpoint config.String
34+
grpcPluginXToken config.String
3135

3236
programUpdateWorkerCount config.Uint64
3337
programUpdateQueueSize config.Uint64
@@ -45,6 +49,7 @@ func WithEnvConfigs() ConfigProvider {
4549
return func() *conf {
4650
return &conf{
4751
grpcPluginEndpoint: env.NewStringConfig(GrpcPluginEndointConfigEnvName, defaultGrpcPluginEndoint),
52+
grpcPluginXToken: env.NewStringConfig(GrpcPluginXTokenConfigEnvName, defaultGrpcPluginXToken),
4853

4954
programUpdateWorkerCount: env.NewUint64Config(ProgramUpdateWorkerCountConfigEnvName, defaultProgramUpdateWorkerCount),
5055
programUpdateQueueSize: env.NewUint64Config(ProgramUpdateQueueSizeConfigEnvName, defaultProgramUpdateQueueSize),

geyser/consumer.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ func (w *Worker) consumeGeyserProgramUpdateEvents(ctx context.Context) error {
2020
default:
2121
}
2222

23-
err := w.subscribeToProgramUpdatesFromGeyser(ctx, w.conf.grpcPluginEndpoint.Get(ctx))
23+
err := w.subscribeToProgramUpdatesFromGeyser(ctx, w.conf.grpcPluginEndpoint.Get(ctx), w.conf.grpcPluginXToken.Get(ctx))
2424
if err != nil && !errors.Is(err, context.Canceled) {
2525
log.WithError(err).Warn("program update consumer unexpectedly terminated")
2626
}
@@ -44,16 +44,16 @@ func (w *Worker) programUpdateWorker(ctx context.Context, id int) {
4444

4545
for update := range w.programUpdatesChan {
4646
func() {
47-
base58PublicKey := base58.Encode(update.Pubkey)
48-
base58ProgramAddress := base58.Encode(update.Owner)
47+
base58PublicKey := base58.Encode(update.Account.Pubkey)
48+
base58ProgramAddress := base58.Encode(update.Account.Owner)
4949

5050
log := log.WithFields(logrus.Fields{
5151
"account": base58PublicKey,
5252
"program": base58ProgramAddress,
5353
"slot": update.Slot,
5454
})
55-
if update.TxSignature != nil {
56-
log = log.WithField("transaction", *update.TxSignature)
55+
if len(update.Account.TxnSignature) > 0 {
56+
log = log.WithField("transaction", base58.Encode(update.Account.TxnSignature))
5757
}
5858

5959
handler, ok := w.programUpdateHandlers[base58ProgramAddress]

geyser/handler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ type ProgramAccountUpdateHandler interface {
1717
// to come in order. Implementations must be idempotent and should not
1818
// trust the account data passed in. Always refer to finalized blockchain
1919
// state from another RPC provider.
20-
Handle(ctx context.Context, update *geyserpb.AccountUpdate) error
20+
Handle(ctx context.Context, update *geyserpb.SubscribeUpdateAccount) error
2121

2222
// RunBackupWorker runs the backup worker for the handler, which should
2323
// periodically fill any gaps of data due to missed real-time events from

geyser/handler_memory.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,14 +74,14 @@ func NewMemoryAccountWithDataUpdateHandler(solanaClient solana.Client, ramStore
7474
}
7575

7676
// Handle implements geyser.ProgramAccountUpdateHandler.Handle
77-
func (h *MemoryAccountWithDataUpdateHandler) Handle(ctx context.Context, update *geyserpb.AccountUpdate) error {
77+
func (h *MemoryAccountWithDataUpdateHandler) Handle(ctx context.Context, update *geyserpb.SubscribeUpdateAccount) error {
7878
// Simply fetch finalized account state as the safest option
7979
var finalizedData []byte
8080
var finalizedSlot uint64
8181
var err error
8282
_, err = retry.Retry(
8383
func() error {
84-
finalizedData, finalizedSlot, err = h.solanaClient.GetAccountDataAfterBlock(update.Pubkey, update.Slot)
84+
finalizedData, finalizedSlot, err = h.solanaClient.GetAccountDataAfterBlock(update.Account.Pubkey, update.Slot)
8585
return err
8686
},
8787
waitForFinalizationRetryStrategies...,
@@ -94,7 +94,7 @@ func (h *MemoryAccountWithDataUpdateHandler) Handle(ctx context.Context, update
9494
if err := state.Unmarshal(finalizedData); err != nil {
9595
return nil
9696
}
97-
return h.onStateObserved(ctx, update.Pubkey, finalizedSlot, &state)
97+
return h.onStateObserved(ctx, update.Account.Pubkey, finalizedSlot, &state)
9898
}
9999

100100
// RunBackupWorker implements geyser.ProgramAccountUpdateHandler.RunBackupWorker

geyser/server.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ type Worker struct {
1717
log *logrus.Entry
1818
conf *conf
1919

20-
programUpdatesChan chan *geyserpb.AccountUpdate
20+
programUpdatesChan chan *geyserpb.SubscribeUpdateAccount
2121
programUpdateHandlers map[string]ProgramAccountUpdateHandler
2222
}
2323

@@ -28,7 +28,7 @@ func NewWorker(ctx context.Context, solanaClient solana.Client, ramStore ram.Sto
2828
log: logrus.StandardLogger().WithField("type", "geyser/worker"),
2929
conf: conf,
3030

31-
programUpdatesChan: make(chan *geyserpb.AccountUpdate, conf.programUpdateQueueSize.Get(context.Background())),
31+
programUpdatesChan: make(chan *geyserpb.SubscribeUpdateAccount, conf.programUpdateQueueSize.Get(context.Background())),
3232
programUpdateHandlers: initializeProgramAccountUpdateHandlers(conf, solanaClient, ramStore),
3333
}
3434
}

geyser/subscription.go

Lines changed: 75 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -4,97 +4,107 @@ import (
44
"context"
55
"time"
66

7+
"github.com/mr-tron/base58"
78
"github.com/pkg/errors"
8-
9-
"github.com/code-payments/code-server/pkg/solana/cvm"
109
"google.golang.org/grpc"
1110
"google.golang.org/grpc/credentials/insecure"
11+
"google.golang.org/grpc/metadata"
1212

1313
geyserpb "github.com/code-payments/code-vm-indexer/generated/geyser/v1"
14+
15+
"github.com/code-payments/code-server/pkg/solana/cvm"
1416
)
1517

1618
const (
1719
defaultStreamSubscriptionTimeout = time.Minute
1820
)
1921

2022
var (
21-
ErrSubscriptionFallenBehind = errors.New("subscription stream fell behind")
22-
ErrTimeoutReceivingUpdate = errors.New("timed out receiving update")
23+
ErrTimeoutReceivingUpdate = errors.New("timed out receiving update")
2324
)
2425

25-
func newGeyserClient(ctx context.Context, endpoint string) (geyserpb.GeyserClient, error) {
26-
conn, err := grpc.Dial(endpoint, grpc.WithTransportCredentials(insecure.NewCredentials()))
26+
func newGeyserClient(endpoint, xToken string) (geyserpb.GeyserClient, error) {
27+
opts := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}
28+
if len(xToken) > 0 {
29+
opts = append(
30+
opts,
31+
grpc.WithUnaryInterceptor(newXTokenUnaryClientInterceptor(xToken)),
32+
grpc.WithStreamInterceptor(newXTokenStreamClientInterceptor(xToken)),
33+
)
34+
}
35+
36+
conn, err := grpc.NewClient(endpoint, opts...)
2737
if err != nil {
2838
return nil, err
2939
}
3040

3141
client := geyserpb.NewGeyserClient(conn)
3242

33-
// Unfortunately the RPCs we use no longer support hearbeats. We'll let each
34-
// individual subscriber determine what an appropriate timeout to receive a
35-
// message should be.
36-
/*
37-
heartbeatResp, err := client.GetHeartbeatInterval(ctx, &geyserpb.EmptyRequest{})
38-
if err != nil {
39-
return nil, 0, errors.Wrap(err, "error getting heartbeat interval")
40-
}
41-
42-
heartbeatTimeout := time.Duration(2 * heartbeatResp.HeartbeatIntervalMs * uint64(time.Millisecond))
43-
*/
44-
4543
return client, nil
4644
}
4745

48-
func boundedProgramUpdateRecv(ctx context.Context, streamer geyserpb.Geyser_SubscribeProgramUpdatesClient, timeout time.Duration) (update *geyserpb.TimestampedAccountUpdate, err error) {
46+
func boundedRecv(ctx context.Context, streamer geyserpb.Geyser_SubscribeClient, timeout time.Duration) (update *geyserpb.SubscribeUpdate, err error) {
4947
done := make(chan struct{})
5048
go func() {
5149
update, err = streamer.Recv()
5250
close(done)
5351
}()
5452

5553
select {
54+
case <-ctx.Done():
55+
return nil, ctx.Err()
5656
case <-time.After(timeout):
5757
return nil, ErrTimeoutReceivingUpdate
5858
case <-done:
5959
return update, err
6060
}
6161
}
6262

63-
func (w *Worker) subscribeToProgramUpdatesFromGeyser(ctx context.Context, endpoint string) error {
63+
func (w *Worker) subscribeToProgramUpdatesFromGeyser(ctx context.Context, endpoint, xToken string) error {
6464
log := w.log.WithField("method", "subscribeToProgramUpdatesFromGeyser")
6565
log.Debug("subscription started")
6666

6767
defer func() {
6868
log.Debug("subscription stopped")
6969
}()
7070

71-
client, err := newGeyserClient(ctx, endpoint)
71+
client, err := newGeyserClient(endpoint, xToken)
7272
if err != nil {
7373
return errors.Wrap(err, "error creating client")
7474
}
7575

76-
streamer, err := client.SubscribeProgramUpdates(ctx, &geyserpb.SubscribeProgramsUpdatesRequest{
77-
Programs: [][]byte{cvm.PROGRAM_ID},
78-
})
76+
streamer, err := client.Subscribe(ctx)
7977
if err != nil {
8078
return errors.Wrap(err, "error opening subscription stream")
8179
}
8280

81+
req := &geyserpb.SubscribeRequest{
82+
Accounts: make(map[string]*geyserpb.SubscribeRequestFilterAccounts),
83+
}
84+
req.Accounts["accounts_subscription"] = &geyserpb.SubscribeRequestFilterAccounts{
85+
Owner: []string{base58.Encode(cvm.PROGRAM_ID)},
86+
}
87+
finalizedCommitmentLevel := geyserpb.CommitmentLevel_FINALIZED
88+
req.Commitment = &finalizedCommitmentLevel
89+
err = streamer.Send(req)
90+
if err != nil {
91+
return errors.Wrap(err, "error sending subscription request")
92+
}
93+
8394
for {
84-
update, err := boundedProgramUpdateRecv(ctx, streamer, defaultStreamSubscriptionTimeout)
95+
update, err := boundedRecv(ctx, streamer, defaultStreamSubscriptionTimeout)
8596
if err != nil {
86-
return errors.Wrap(err, "error receiving update")
97+
return errors.Wrap(err, "error recieving update")
8798
}
8899

89-
messageAge := time.Since(update.Ts.AsTime())
90-
if messageAge > defaultStreamSubscriptionTimeout {
91-
log.WithField("message_age", messageAge).Warn(ErrSubscriptionFallenBehind.Error())
92-
return ErrSubscriptionFallenBehind
100+
accountUpdate := update.GetAccount()
101+
if accountUpdate == nil {
102+
continue
93103
}
94104

95105
// Ignore startup updates. We only care about real-time updates due to
96106
// transactions.
97-
if update.AccountUpdate.IsStartup {
107+
if accountUpdate.IsStartup {
98108
continue
99109
}
100110

@@ -103,9 +113,42 @@ func (w *Worker) subscribeToProgramUpdatesFromGeyser(ctx context.Context, endpoi
103113
// backing up the Geyser plugin, which kills this subscription and we end up
104114
// missing updates.
105115
select {
106-
case w.programUpdatesChan <- update.AccountUpdate:
116+
case w.programUpdatesChan <- accountUpdate:
107117
default:
108118
log.Warn("dropping update because queue is full")
109119
}
110120
}
111121
}
122+
123+
func newXTokenUnaryClientInterceptor(xToken string) grpc.UnaryClientInterceptor {
124+
return func(
125+
ctx context.Context,
126+
method string,
127+
req, reply interface{},
128+
cc *grpc.ClientConn,
129+
invoker grpc.UnaryInvoker,
130+
opts ...grpc.CallOption,
131+
) error {
132+
ctx = withXToken(ctx, xToken)
133+
return invoker(ctx, method, req, reply, cc, opts...)
134+
}
135+
}
136+
137+
func newXTokenStreamClientInterceptor(xToken string) grpc.StreamClientInterceptor {
138+
return func(
139+
ctx context.Context,
140+
desc *grpc.StreamDesc,
141+
cc *grpc.ClientConn,
142+
method string,
143+
streamer grpc.Streamer,
144+
opts ...grpc.CallOption,
145+
) (grpc.ClientStream, error) {
146+
ctx = withXToken(ctx, xToken)
147+
return streamer(ctx, desc, cc, method, opts...)
148+
}
149+
}
150+
151+
func withXToken(ctx context.Context, xToken string) context.Context {
152+
md := metadata.Pairs("x-token", xToken)
153+
return metadata.NewOutgoingContext(ctx, md)
154+
}

0 commit comments

Comments
 (0)