Skip to content

Balances are now resistant to outbound transfer races #199

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
filippo.io/edwards25519 v1.1.0
github.com/aws/aws-sdk-go-v2 v0.17.0
github.com/bits-and-blooms/bloom/v3 v3.1.0
github.com/code-payments/code-protobuf-api v1.19.1-0.20250605155512-63da5d11d58a
github.com/code-payments/code-protobuf-api v1.19.1-0.20250610140050-4cadbcc86f16
github.com/code-payments/code-vm-indexer v0.1.11-0.20241028132209-23031e814fba
github.com/emirpasic/gods v1.12.0
github.com/envoyproxy/protoc-gen-validate v1.2.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnht
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
github.com/cockroachdb/apd v1.1.0 h1:3LFP3629v+1aKXU5Q37mxmRxX/pIu1nijXydLShEq5I=
github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ=
github.com/code-payments/code-protobuf-api v1.19.1-0.20250605155512-63da5d11d58a h1:h5AFZjmn+Zzkkd0u2Y+h9msj7HYBOSI3l4i5CD0ls34=
github.com/code-payments/code-protobuf-api v1.19.1-0.20250605155512-63da5d11d58a/go.mod h1:ee6TzKbgMS42ZJgaFEMG3c4R3dGOiffHSu6MrY7WQvs=
github.com/code-payments/code-protobuf-api v1.19.1-0.20250610140050-4cadbcc86f16 h1:drAMKRdbyObW8E4H6xc1pKIDxoFYgpaTdMlEnIKBIJ0=
github.com/code-payments/code-protobuf-api v1.19.1-0.20250610140050-4cadbcc86f16/go.mod h1:ee6TzKbgMS42ZJgaFEMG3c4R3dGOiffHSu6MrY7WQvs=
github.com/code-payments/code-vm-indexer v0.1.11-0.20241028132209-23031e814fba h1:Bkp+gmeb6Y2PWXfkSCTMBGWkb2P1BujRDSjWeI+0j5I=
github.com/code-payments/code-vm-indexer v0.1.11-0.20241028132209-23031e814fba/go.mod h1:jSiifpiBpyBQ8q2R0MGEbkSgWC6sbdRTyDBntmW+j1E=
github.com/containerd/continuity v0.0.0-20190827140505-75bee3e2ccb6 h1:NmTXa/uVnDyp0TY5MKi197+3HWcnYWfnHGyaFthlnGw=
Expand Down
18 changes: 15 additions & 3 deletions pkg/code/async/account/gift_card.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

commonpb "github.com/code-payments/code-protobuf-api/generated/go/common/v1"

"github.com/code-payments/code-server/pkg/code/balance"
"github.com/code-payments/code-server/pkg/code/common"
code_data "github.com/code-payments/code-server/pkg/code/data"
"github.com/code-payments/code-server/pkg/code/data/account"
Expand Down Expand Up @@ -93,6 +94,12 @@ func (p *service) maybeInitiateGiftCardAutoReturn(ctx context.Context, accountIn
return err
}

balanceLock, err := balance.GetOptimisticVersionLock(ctx, p.data, giftCardVaultAccount)
if err != nil {
log.WithError(err).Warn("failure getting balance lock")
return err
}

_, err = p.data.GetGiftCardClaimedAction(ctx, giftCardVaultAccount.PublicKey().ToBase58())
if err == nil {
log.Trace("gift card is claimed and will be removed from worker queue")
Expand Down Expand Up @@ -124,7 +131,7 @@ func (p *service) maybeInitiateGiftCardAutoReturn(ctx context.Context, accountIn
// There's no action to claim the gift card and the expiry window has been met.
// It's time to initiate the process of auto-returning the funds back to the
// issuer.
err = InitiateProcessToAutoReturnGiftCard(ctx, p.data, giftCardVaultAccount, false)
err = InitiateProcessToAutoReturnGiftCard(ctx, p.data, giftCardVaultAccount, false, balanceLock)
if err != nil {
log.WithError(err).Warn("failure initiating process to return gift card balance to issuer")
return err
Expand All @@ -138,7 +145,7 @@ func (p *service) maybeInitiateGiftCardAutoReturn(ctx context.Context, accountIn
// a good guide for similar actions in the future.
//
// todo: This probably belongs somewhere more common
func InitiateProcessToAutoReturnGiftCard(ctx context.Context, data code_data.Provider, giftCardVaultAccount *common.Account, isVoidedByUser bool) error {
func InitiateProcessToAutoReturnGiftCard(ctx context.Context, data code_data.Provider, giftCardVaultAccount *common.Account, isVoidedByUser bool, balanceLock *balance.OptimisticVersionLock) error {
return data.ExecuteInTx(ctx, sql.LevelDefault, func(ctx context.Context) error {
giftCardIssuedIntent, err := data.GetOriginalGiftCardIssuedIntent(ctx, giftCardVaultAccount.PublicKey().ToBase58())
if err != nil {
Expand Down Expand Up @@ -199,7 +206,12 @@ func InitiateProcessToAutoReturnGiftCard(ctx context.Context, data code_data.Pro

// This will trigger the fulfillment worker to poll for the fulfillment. This
// should be the very last DB update called.
return markFulfillmentAsActivelyScheduled(ctx, data, autoReturnFulfillment[0])
err = markFulfillmentAsActivelyScheduled(ctx, data, autoReturnFulfillment[0])
if err != nil {
return err
}

return balanceLock.OnCommit(ctx, data)
})
}

Expand Down
102 changes: 49 additions & 53 deletions pkg/code/balance/calculator.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,28 @@ type State struct {
current int64
}

// Calculate calculates a token account's balance using a starting point and a set
// of strategies. Each may be incomplete individually, but in total must form a
// complete balance calculation.
func Calculate(ctx context.Context, tokenAccount *common.Account, initialBalance uint64, strategies ...Strategy) (balance uint64, err error) {
balanceState := &State{
current: int64(initialBalance),
}

for _, strategy := range strategies {
balanceState, err = strategy(ctx, tokenAccount, balanceState)
if err != nil {
return 0, err
}
}

if balanceState.current < 0 {
return 0, ErrNegativeBalance
}

return uint64(balanceState.current), nil
}

// CalculateFromCache is the default and recommended strategy for reliably estimating
// a token account's balance using cached values.
//
Expand Down Expand Up @@ -168,28 +190,6 @@ func CalculateFromBlockchain(ctx context.Context, data code_data.Provider, token
return quarks, BlockchainSource, nil
}

// Calculate calculates a token account's balance using a starting point and a set
// of strategies. Each may be incomplete individually, but in total must form a
// complete balance calculation.
func Calculate(ctx context.Context, tokenAccount *common.Account, initialBalance uint64, strategies ...Strategy) (balance uint64, err error) {
balanceState := &State{
current: int64(initialBalance),
}

for _, strategy := range strategies {
balanceState, err = strategy(ctx, tokenAccount, balanceState)
if err != nil {
return 0, err
}
}

if balanceState.current < 0 {
return 0, ErrNegativeBalance
}

return uint64(balanceState.current), nil
}

// NetBalanceFromIntentActions is a balance calculation strategy that incorporates
// the net balance by applying payment intents to the current balance.
func NetBalanceFromIntentActions(ctx context.Context, data code_data.Provider) Strategy {
Expand Down Expand Up @@ -243,14 +243,39 @@ type BatchState struct {
current map[string]int64
}

// CalculateBatch calculates a set of token accounts' balance using a starting point
// and a set of strategies. Each may be incomplete individually, but in total must
// form a complete balance calculation.
func CalculateBatch(ctx context.Context, tokenAccounts []string, strategies ...BatchStrategy) (balanceByTokenAccount map[string]uint64, err error) {
balanceState := &BatchState{
current: make(map[string]int64),
}

for _, strategy := range strategies {
balanceState, err = strategy(ctx, tokenAccounts, balanceState)
if err != nil {
return nil, err
}
}

res := make(map[string]uint64)
for tokenAccount, balance := range balanceState.current {
if balance < 0 {
return nil, ErrNegativeBalance
}

res[tokenAccount] = uint64(balance)
}

return res, nil
}

// BatchCalculateFromCacheWithAccountRecords is the default and recommended batch strategy
// or reliably estimating a set of token accounts' balance when common.AccountRecords are
// available.
//
// Note: Use this method when calculating balances for accounts that are managed by
// Code (ie. Timelock account) and operate within the L2 system.
//
// Note: This only supports post-privacy accounts. Use CalculateFromCache instead.
func BatchCalculateFromCacheWithAccountRecords(ctx context.Context, data code_data.Provider, accountRecordsBatch ...*common.AccountRecords) (map[string]uint64, error) {
tracer := metrics.TraceMethodCall(ctx, metricsPackageName, "BatchCalculateFromCacheWithAccountRecords")
defer tracer.End()
Expand Down Expand Up @@ -279,8 +304,6 @@ func BatchCalculateFromCacheWithAccountRecords(ctx context.Context, data code_da
//
// Note: Use this method when calculating balances for accounts that are managed by
// Code (ie. Timelock account) and operate within the L2 system.
//
// Note: This only supports post-privacy accounts. Use CalculateFromCache instead.
func BatchCalculateFromCacheWithTokenAccounts(ctx context.Context, data code_data.Provider, tokenAccounts ...*common.Account) (map[string]uint64, error) {
tracer := metrics.TraceMethodCall(ctx, metricsPackageName, "BatchCalculateFromCacheWithTokenAccounts")
defer tracer.End()
Expand Down Expand Up @@ -333,33 +356,6 @@ func defaultBatchCalculationFromCache(ctx context.Context, data code_data.Provid
)
}

// CalculateBatch calculates a set of token accounts' balance using a starting point
// and a set of strategies. Each may be incomplete individually, but in total must
// form a complete balance calculation.
func CalculateBatch(ctx context.Context, tokenAccounts []string, strategies ...BatchStrategy) (balanceByTokenAccount map[string]uint64, err error) {
balanceState := &BatchState{
current: make(map[string]int64),
}

for _, strategy := range strategies {
balanceState, err = strategy(ctx, tokenAccounts, balanceState)
if err != nil {
return nil, err
}
}

res := make(map[string]uint64)
for tokenAccount, balance := range balanceState.current {
if balance < 0 {
return nil, ErrNegativeBalance
}

res[tokenAccount] = uint64(balance)
}

return res, nil
}

// NetBalanceFromIntentActionsBatch is a balance calculation strategy that incorporates
// the net balance by applying payment intents to the current balance.
func NetBalanceFromIntentActionsBatch(ctx context.Context, data code_data.Provider) BatchStrategy {
Expand Down
34 changes: 34 additions & 0 deletions pkg/code/balance/lock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package balance

import (
"context"

"github.com/code-payments/code-server/pkg/code/common"
code_data "github.com/code-payments/code-server/pkg/code/data"
)

// OptimisticVersionLock is an optimistic version lock on an account's cached
// balance, which can be paired with DB updates against balances that need to
// be protected against race conditions.
type OptimisticVersionLock struct {
vault *common.Account
currentVersion uint64
}

// GetOptimisticVersionLock gets an optimistic version lock for the vault account's
// cached balance
func GetOptimisticVersionLock(ctx context.Context, data code_data.Provider, vault *common.Account) (*OptimisticVersionLock, error) {
version, err := data.GetCachedBalanceVersion(ctx, vault.PublicKey().ToBase58())
if err != nil {
return nil, err
}
return &OptimisticVersionLock{
vault: vault,
currentVersion: version,
}, nil
}

// OnCommit is called in the DB transaction updating the account's cached balance
func (l *OptimisticVersionLock) OnCommit(ctx context.Context, data code_data.Provider) error {
return data.AdvanceCachedBalanceVersion(ctx, l.vault.PublicKey().ToBase58(), l.currentVersion)
}
49 changes: 45 additions & 4 deletions pkg/code/data/balance/memory/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,54 @@ import (
)

type store struct {
mu sync.Mutex
externalCheckpointRecords []*balance.ExternalCheckpointRecord
last uint64
mu sync.Mutex
cachedBalanceVersionsByAccount map[string]uint64
externalCheckpointRecords []*balance.ExternalCheckpointRecord
last uint64
}

// New returns a new in memory balance.Store
func New() balance.Store {
return &store{}
return &store{
cachedBalanceVersionsByAccount: make(map[string]uint64),
}
}

// GetCachedVersion implements balance.Store.GetCachedVersion
func (s *store) GetCachedVersion(_ context.Context, account string) (uint64, error) {
s.mu.Lock()
defer s.mu.Unlock()

current, ok := s.cachedBalanceVersionsByAccount[account]
if !ok {
return 0, nil
}
return current, nil
}

// AdvanceCachedVersion implements balance.Store.AdvanceCachedVersion
func (s *store) AdvanceCachedVersion(_ context.Context, account string, currentVersion uint64) error {
s.mu.Lock()
defer s.mu.Unlock()

actualVersion, ok := s.cachedBalanceVersionsByAccount[account]
if !ok {
if currentVersion != 0 {
return balance.ErrStaleCachedBalanceVersion
}

s.cachedBalanceVersionsByAccount[account] = 1

return nil
}

if actualVersion != currentVersion {
return balance.ErrStaleCachedBalanceVersion
}

s.cachedBalanceVersionsByAccount[account]++

return nil
}

// SaveExternalCheckpoint implements balance.Store.SaveExternalCheckpoint
Expand Down Expand Up @@ -87,6 +127,7 @@ func (s *store) reset() {
s.mu.Lock()
defer s.mu.Unlock()

s.cachedBalanceVersionsByAccount = make(map[string]uint64)
s.externalCheckpointRecords = nil
s.last = 0
}
51 changes: 47 additions & 4 deletions pkg/code/data/balance/postgres/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ import (
"github.com/jmoiron/sqlx"

"github.com/code-payments/code-server/pkg/code/data/balance"
pg "github.com/code-payments/code-server/pkg/database/postgres"
pgutil "github.com/code-payments/code-server/pkg/database/postgres"
)

const (
externalCheckpointTableName = "codewallet__core_externalbalancecheckpoint"
cachedBalanceVersionTableName = "codewallet__core_cachedbalanceversion"
externalCheckpointTableName = "codewallet__core_externalbalancecheckpoint"
)

type externalCheckpointModel struct {
Expand All @@ -25,6 +27,49 @@ type externalCheckpointModel struct {
LastUpdatedAt time.Time `db:"last_updated_at"`
}

func dbGetCachedVersion(ctx context.Context, db *sqlx.DB, account string) (uint64, error) {
var res uint64
query := `SELECT version FROM ` + cachedBalanceVersionTableName + `
WHERE token_account = $1`
err := db.GetContext(ctx, &res, query, account)
if pg.IsNoRows(err) {
return 0, nil
} else if err != nil {
return 0, err
}
return res, nil
}

func dbAdvanceCachedVersion(ctx context.Context, db *sqlx.DB, account string, currentVersion uint64) error {
return pgutil.ExecuteInTx(ctx, db, sql.LevelDefault, func(tx *sqlx.Tx) error {
query := `INSERT INTO ` + cachedBalanceVersionTableName + `
(token_account, version)
VALUES ($1, 1)
RETURNING version
`
params := []any{account}
if currentVersion > 0 {
query = `UPDATE ` + cachedBalanceVersionTableName + `
SET version = version + 1
WHERE token_account = $1 AND version = $2
RETURNING version
`
params = append(params, currentVersion)
}

var res uint64
err := tx.GetContext(ctx, &res, query, params...)
if pg.IsNoRows(err) || pg.IsUniqueViolation(err) {
return balance.ErrStaleCachedBalanceVersion
}
if err != nil {
return err
}
return nil
})

}

func toExternalCheckpointModel(obj *balance.ExternalCheckpointRecord) (*externalCheckpointModel, error) {
if err := obj.Validate(); err != nil {
return nil, err
Expand Down Expand Up @@ -80,9 +125,7 @@ func (m *externalCheckpointModel) dbSave(ctx context.Context, db *sqlx.DB) error
func dbGetExternalCheckpoint(ctx context.Context, db *sqlx.DB, account string) (*externalCheckpointModel, error) {
res := &externalCheckpointModel{}

query := `SELECT
id, token_account, quarks, slot_checkpoint, last_updated_at
FROM ` + externalCheckpointTableName + `
query := `SELECT id, token_account, quarks, slot_checkpoint, last_updated_at FROM ` + externalCheckpointTableName + `
WHERE token_account = $1
LIMIT 1`

Expand Down
Loading
Loading