diff --git a/deso/block_test.go b/deso/block_test.go index 70fee6c7..215b862a 100644 --- a/deso/block_test.go +++ b/deso/block_test.go @@ -37,7 +37,7 @@ func TestUtxoOpsProblem(t *testing.T) { rosettaIndexOpts.ValueDir = rosettaIndexDir rosettaIndex, err := badger.Open(rosettaIndexOpts) require.NoError(err) - node.Index = NewIndex(rosettaIndex) + node.Index = NewIndex(rosettaIndex, node.chainDB) // Listen to transaction and block events so we can fill RosettaIndex with relevant data node.EventManager = lib.NewEventManager() diff --git a/deso/events.go b/deso/events.go index 45b5a3da..663eeb86 100644 --- a/deso/events.go +++ b/deso/events.go @@ -36,63 +36,63 @@ func (node *Node) handleSnapshotCompleted() { // Iterate through every single public key and put a balance snapshot down // for it for this block. We don't need to worry about ancestral records here // because we haven't generated any yet. - - err := node.Index.db.Update(func(indexTxn *badger.Txn) error { - return node.GetBlockchain().DB().View(func(chainTxn *badger.Txn) error { - opts := badger.DefaultIteratorOptions - nodeIterator := chainTxn.NewIterator(opts) - defer nodeIterator.Close() - prefix := lib.Prefixes.PrefixPublicKeyToDeSoBalanceNanos - - // Partition the balances across the blocks before the snapshot block height. - totalCount := uint64(0) - for nodeIterator.Seek(prefix); nodeIterator.ValidForPrefix(prefix); nodeIterator.Next() { - totalCount++ + err := node.GetBlockchain().DB().View(func(chainTxn *badger.Txn) error { + opts := badger.DefaultIteratorOptions + nodeIterator := chainTxn.NewIterator(opts) + defer nodeIterator.Close() + prefix := lib.Prefixes.PrefixPublicKeyToDeSoBalanceNanos + + // Partition the balances across the blocks before the snapshot block height. + totalCount := uint64(0) + for nodeIterator.Seek(prefix); nodeIterator.ValidForPrefix(prefix); nodeIterator.Next() { + totalCount++ + } + currentBlockHeight := uint64(1) + // We'll force a ceiling on this because otherwise the last block could amass O(snapshotBlockHeight) balances + balancesPerBlock := totalCount / snapshotBlockHeight + balancesMap := make(map[lib.PublicKey]uint64) + if totalCount < snapshotBlockHeight { + balancesPerBlock = 1 + } + currentCounter := uint64(0) + + for nodeIterator.Seek(prefix); nodeIterator.ValidForPrefix(prefix); nodeIterator.Next() { + key := nodeIterator.Item().Key() + keyCopy := make([]byte, len(key)) + copy(keyCopy[:], key[:]) + + valCopy, err := nodeIterator.Item().ValueCopy(nil) + if err != nil { + return errors.Wrapf(err, "Problem iterating over chain database, "+ + "on key (%v) and value (%v)", keyCopy, valCopy) } - currentBlockHeight := uint64(1) - // We'll force a ceiling on this because otherwise the last block could amass O(snapshotBlockHeight) balances - balancesPerBlock := totalCount / snapshotBlockHeight - balancesMap := make(map[lib.PublicKey]uint64) - if totalCount < snapshotBlockHeight { - balancesPerBlock = 1 - } - currentCounter := uint64(0) - - for nodeIterator.Seek(prefix); nodeIterator.ValidForPrefix(prefix); nodeIterator.Next() { - key := nodeIterator.Item().Key() - keyCopy := make([]byte, len(key)) - copy(keyCopy[:], key[:]) - - valCopy, err := nodeIterator.Item().ValueCopy(nil) - if err != nil { - return errors.Wrapf(err, "Problem iterating over chain database, "+ - "on key (%v) and value (%v)", keyCopy, valCopy) - } - balance := lib.DecodeUint64(valCopy) - pubKey := lib.NewPublicKey(key[1:]) - balancesMap[*pubKey] = balance - - if err := node.Index.PutSingleBalanceSnapshotWithTxn( - indexTxn, currentBlockHeight, false, *pubKey, balance); err != nil { - return errors.Wrapf(err, "Problem updating balance snapshot in index, "+ - "on key (%v), value (%v), and height (%v)", keyCopy, valCopy, - snapshot.CurrentEpochSnapshotMetadata.FirstSnapshotBlockHeight) - } + balance := lib.DecodeUint64(valCopy) + pubKey := lib.NewPublicKey(key[1:]) + balancesMap[*pubKey] = balance + err = node.Index.PutSingleBalanceSnapshot(currentBlockHeight, false, *pubKey, balance) + if err != nil { + return errors.Wrapf(err, "Problem updating balance snapshot in index,"+ + "on key (%v), value (%v), and height (%v)", keyCopy, valCopy, + snapshot.CurrentEpochSnapshotMetadata.FirstSnapshotBlockHeight) + } - currentCounter += 1 - if currentCounter >= balancesPerBlock && currentBlockHeight < snapshotBlockHeight { - node.Index.PutHypersyncBlockBalances(currentBlockHeight, false, balancesMap) - balancesMap = make(map[lib.PublicKey]uint64) - currentBlockHeight++ - currentCounter = 0 + currentCounter += 1 + if currentCounter >= balancesPerBlock && currentBlockHeight < snapshotBlockHeight { + if err = node.Index.PutHypersyncBlockBalances(currentBlockHeight, false, balancesMap); err != nil { + return errors.Wrapf(err, "Problem putting hypserync block balances at height %v", currentBlockHeight) } + balancesMap = make(map[lib.PublicKey]uint64) + currentBlockHeight++ + currentCounter = 0 } - if currentCounter > 0 { - node.Index.PutHypersyncBlockBalances(currentBlockHeight, false, balancesMap) + } + if currentCounter > 0 { + if err := node.Index.PutHypersyncBlockBalances(currentBlockHeight, false, balancesMap); err != nil { + return errors.Wrapf(err, "Problem putting hypserync block balances at height %v", currentBlockHeight) } - return nil - }) + } + return nil }) if err != nil { glog.Errorf(lib.CLog(lib.Red, fmt.Sprintf("handleSnapshotCompleted: error: (%v)", err))) @@ -109,87 +109,89 @@ func (node *Node) handleSnapshotCompleted() { // // TODO: Do we need to do anything special for SwapIdentity? See below for // some tricky logic there. + // This is pretty much the same as lib.DBGetAllProfilesByCoinValue but we don't load all entries into memory. + err := node.GetBlockchain().DB().View(func(chainTxn *badger.Txn) error { + dbPrefixx := append([]byte{}, lib.Prefixes.PrefixCreatorDeSoLockedNanosCreatorPKID...) + opts := badger.DefaultIteratorOptions + opts.PrefetchValues = false + // Go in reverse order since a larger count is better. + opts.Reverse = true + + it := chainTxn.NewIterator(opts) + defer it.Close() + + totalCount := uint64(0) + for it.Seek(dbPrefixx); it.ValidForPrefix(dbPrefixx); it.Next() { + totalCount++ + } + currentBlockHeight := uint64(1) + balancesPerBlock := totalCount / snapshotBlockHeight + balancesMap := make(map[lib.PublicKey]uint64) + if totalCount < snapshotBlockHeight { + balancesPerBlock = 1 + } + currentCounter := uint64(0) + + // Since we iterate backwards, the prefix must be bigger than all possible + // counts that could actually exist. We use eight bytes since the count is + // encoded as a 64-bit big-endian byte slice, which will be eight bytes long. + maxBigEndianUint64Bytes := []byte{0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF} + prefix := append(dbPrefixx, maxBigEndianUint64Bytes...) + for it.Seek(prefix); it.ValidForPrefix(dbPrefixx); it.Next() { + rawKey := it.Item().Key() + + // Strip the prefix off the key and check its length. If it contains + // a big-endian uint64 then it should be at least eight bytes. + lockedDeSoPubKeyConcatKey := rawKey[1:] + uint64BytesLen := len(maxBigEndianUint64Bytes) + expectedLength := uint64BytesLen + btcec.PubKeyBytesLenCompressed + if len(lockedDeSoPubKeyConcatKey) != expectedLength { + return fmt.Errorf("Invalid key length %d should be at least %d", + len(lockedDeSoPubKeyConcatKey), expectedLength) + } - err := node.Index.db.Update(func(indexTxn *badger.Txn) error { - // This is pretty much the same as lib.DBGetAllProfilesByCoinValue but we don't load all entries into memory. - return node.GetBlockchain().DB().View(func(chainTxn *badger.Txn) error { - dbPrefixx := append([]byte{}, lib.Prefixes.PrefixCreatorDeSoLockedNanosCreatorPKID...) - opts := badger.DefaultIteratorOptions - opts.PrefetchValues = false - // Go in reverse order since a larger count is better. - opts.Reverse = true + lockedDeSoNanos := lib.DecodeUint64(lockedDeSoPubKeyConcatKey[:uint64BytesLen]) - it := chainTxn.NewIterator(opts) - defer it.Close() + // Appended to the stake should be the profile pub key so extract it here. + profilePKIDbytes := make([]byte, btcec.PubKeyBytesLenCompressed) + copy(profilePKIDbytes[:], lockedDeSoPubKeyConcatKey[uint64BytesLen:]) + profilePKID := lib.PublicKeyToPKID(profilePKIDbytes) - totalCount := uint64(0) - for it.Seek(dbPrefixx); it.ValidForPrefix(dbPrefixx); it.Next() { - totalCount++ + pkBytes := lib.DBGetPublicKeyForPKIDWithTxn(chainTxn, nil, profilePKID) + if pkBytes == nil { + return fmt.Errorf("DBGetPublicKeyForPKIDWithTxn: Nil pkBytes for pkid %v", + lib.PkToStringMainnet(profilePKID[:])) } - currentBlockHeight := uint64(1) - balancesPerBlock := totalCount / snapshotBlockHeight - balancesMap := make(map[lib.PublicKey]uint64) - if totalCount < snapshotBlockHeight { - balancesPerBlock = 1 + pubKey := *lib.NewPublicKey(pkBytes) + balancesMap[pubKey] = lockedDeSoNanos + + // We have to also put the balances in the other index. Not doing this would cause + // balances to return zero when we're PAST the first snapshot block height. + if err := node.Index.PutSingleBalanceSnapshot( + currentBlockHeight, true, pubKey, lockedDeSoNanos); err != nil { + return errors.Wrapf(err, "PutSingleBalanceSnapshot: problem with "+ + "pubkey (%v), lockedDeSoNanos (%v) and firstSnapshotHeight (%v)", + pubKey, lockedDeSoNanos, snapshot.CurrentEpochSnapshotMetadata.FirstSnapshotBlockHeight) } - currentCounter := uint64(0) - - // Since we iterate backwards, the prefix must be bigger than all possible - // counts that could actually exist. We use eight bytes since the count is - // encoded as a 64-bit big-endian byte slice, which will be eight bytes long. - maxBigEndianUint64Bytes := []byte{0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF} - prefix := append(dbPrefixx, maxBigEndianUint64Bytes...) - for it.Seek(prefix); it.ValidForPrefix(dbPrefixx); it.Next() { - rawKey := it.Item().Key() - // Strip the prefix off the key and check its length. If it contains - // a big-endian uint64 then it should be at least eight bytes. - lockedDeSoPubKeyConcatKey := rawKey[1:] - uint64BytesLen := len(maxBigEndianUint64Bytes) - expectedLength := uint64BytesLen + btcec.PubKeyBytesLenCompressed - if len(lockedDeSoPubKeyConcatKey) != expectedLength { - return fmt.Errorf("Invalid key length %d should be at least %d", - len(lockedDeSoPubKeyConcatKey), expectedLength) - } - - lockedDeSoNanos := lib.DecodeUint64(lockedDeSoPubKeyConcatKey[:uint64BytesLen]) - - // Appended to the stake should be the profile pub key so extract it here. - profilePKIDbytes := make([]byte, btcec.PubKeyBytesLenCompressed) - copy(profilePKIDbytes[:], lockedDeSoPubKeyConcatKey[uint64BytesLen:]) - profilePKID := lib.PublicKeyToPKID(profilePKIDbytes) - - pkBytes := lib.DBGetPublicKeyForPKIDWithTxn(chainTxn, nil, profilePKID) - if pkBytes == nil { - return fmt.Errorf("DBGetPublicKeyForPKIDWithTxn: Nil pkBytes for pkid %v", - lib.PkToStringMainnet(profilePKID[:])) - } - pubKey := *lib.NewPublicKey(pkBytes) - balancesMap[pubKey] = lockedDeSoNanos - - // We have to also put the balances in the other index. Not doing this would cause - // balances to return zero when we're PAST the first snapshot block height. - if err := node.Index.PutSingleBalanceSnapshotWithTxn( - indexTxn, currentBlockHeight, true, pubKey, lockedDeSoNanos); err != nil { - - return errors.Wrapf(err, "PutSingleBalanceSnapshotWithTxn: problem with "+ - "pubkey (%v), lockedDeSoNanos (%v) and firstSnapshotHeight (%v)", - pubKey, lockedDeSoNanos, snapshot.CurrentEpochSnapshotMetadata.FirstSnapshotBlockHeight) - } - - currentCounter += 1 - if currentCounter >= balancesPerBlock && currentBlockHeight < snapshotBlockHeight { - node.Index.PutHypersyncBlockBalances(currentBlockHeight, true, balancesMap) - balancesMap = make(map[lib.PublicKey]uint64) - currentBlockHeight++ - currentCounter = 0 + currentCounter += 1 + if currentCounter >= balancesPerBlock && currentBlockHeight < snapshotBlockHeight { + err := node.Index.PutHypersyncBlockBalances(currentBlockHeight, true, balancesMap) + if err != nil { + return errors.Wrapf(err, "Problem putting hypserync block balances at height %v", currentBlockHeight) } + balancesMap = make(map[lib.PublicKey]uint64) + currentBlockHeight++ + currentCounter = 0 } - if currentCounter > 0 { - node.Index.PutHypersyncBlockBalances(currentBlockHeight, true, balancesMap) + } + if currentCounter > 0 { + err := node.Index.PutHypersyncBlockBalances(currentBlockHeight, true, balancesMap) + if err != nil { + return errors.Wrapf(err, "Problem putting hypserync block balances at height %v", currentBlockHeight) } - return nil - }) + } + return nil }) if err != nil { glog.Errorf(lib.CLog(lib.Red, fmt.Sprintf("handleSnapshotCompleted: Problem iterating locked "+ @@ -223,16 +225,9 @@ func (node *Node) handleBlockConnected(event *lib.BlockEvent) { // don't have a snapshot. We output extra metadata for this block to ensure // Rosetta connects it appropriately. - // Save the UTXOOps. These are used to compute all of the meta information - // that Rosetta needs. - err := node.Index.PutUtxoOps(event.Block, event.UtxoOps) - if err != nil { - glog.Errorf("PutSpentUtxos: %v", err) - } - // Save a balance snapshot balances := event.UtxoView.PublicKeyToDeSoBalanceNanos - err = node.Index.PutBalanceSnapshot(event.Block.Header.Height, false, balances) + err := node.Index.PutBalanceSnapshot(event.Block.Header.Height, false, balances) if err != nil { glog.Errorf("PutBalanceSnapshot: %v", err) } diff --git a/deso/index.go b/deso/index.go index 65ddf714..a61f7910 100644 --- a/deso/index.go +++ b/deso/index.go @@ -1,8 +1,7 @@ package deso import ( - "bytes" - "encoding/gob" + "github.com/btcsuite/btcd/btcec" "github.com/deso-protocol/core/lib" "github.com/dgraph-io/badger/v3" "github.com/golang/glog" @@ -26,16 +25,21 @@ const ( // See comments in block.go and convertBlock() for more details. // -> map[lib.PublicKey]uint64 PrefixHypersyncBlockHeightToBalances = byte(3) + + PrefixHypersyncBlockHeightLockedPublicKeyToBalance = byte(4) ) type RosettaIndex struct { - db *badger.DB - dbMutex sync.Mutex + db *badger.DB + dbMutex sync.Mutex + chainDB *badger.DB + snapshot *lib.Snapshot } -func NewIndex(db *badger.DB) *RosettaIndex { +func NewIndex(db *badger.DB, chainDB *badger.DB) *RosettaIndex { return &RosettaIndex{ - db: db, + db: db, + chainDB: chainDB, } } @@ -49,49 +53,12 @@ func (index *RosettaIndex) utxoOpsKey(blockHash *lib.BlockHash) []byte { return prefix } -func (index *RosettaIndex) PutUtxoOps(block *lib.MsgDeSoBlock, utxoOps [][]*lib.UtxoOperation) error { - blockHash, err := block.Hash() - if err != nil { - return err - } - - opBundle := &lib.UtxoOperationBundle{ - UtxoOpBundle: utxoOps, - } - bytes := lib.EncodeToBytes(block.Header.Height, opBundle) - - return index.db.Update(func(txn *badger.Txn) error { - return txn.Set(index.utxoOpsKey(blockHash), bytes) - }) -} - func (index *RosettaIndex) GetUtxoOps(block *lib.MsgDeSoBlock) ([][]*lib.UtxoOperation, error) { blockHash, err := block.Hash() if err != nil { return nil, err } - - opBundle := &lib.UtxoOperationBundle{} - - err = index.db.View(func(txn *badger.Txn) error { - utxoOpsItem, err := txn.Get(index.utxoOpsKey(blockHash)) - if err != nil { - return err - } - - return utxoOpsItem.Value(func(valBytes []byte) error { - rr := bytes.NewReader(valBytes) - if exist, err := lib.DecodeFromBytes(opBundle, rr); !exist || err != nil { - return errors.Wrapf(err, "Problem decoding utxoops, exist: (%v)", exist) - } - return nil - }) - }) - if err != nil { - return nil, err - } - - return opBundle.UtxoOpBundle, nil + return lib.GetUtxoOperationsForBlock(index.chainDB, index.snapshot, blockHash) } // @@ -111,6 +78,31 @@ func balanceSnapshotKey(isLockedBalance bool, publicKey *lib.PublicKey, blockHei return prefix } +func hypersyncHeightToToBlockPublicKeyBalance(blockHeight uint64, isLocked bool, publicKey lib.PublicKey) []byte { + lockedByte := byte(0) + if isLocked { + lockedByte = byte(1) + } + + prefix := append([]byte{}, PrefixHypersyncBlockHeightLockedPublicKeyToBalance) + prefix = append(prefix, lib.EncodeUint64(blockHeight)...) + prefix = append(prefix, lockedByte) + prefix = append(prefix, publicKey.ToBytes()...) + return prefix +} + +func hypersyncHeightToBlockPublicKeyBalancePrefix(blockHeight uint64, isLocked bool) []byte { + lockedByte := byte(0) + if isLocked { + lockedByte = byte(1) + } + + prefix := append([]byte{}, PrefixHypersyncBlockHeightLockedPublicKeyToBalance) + prefix = append(prefix, lib.EncodeUint64(blockHeight)...) + prefix = append(prefix, lockedByte) + return prefix +} + func hypersyncHeightToBlockKey(blockHeight uint64, isLocked bool) []byte { lockedByte := byte(0) @@ -124,18 +116,43 @@ func hypersyncHeightToBlockKey(blockHeight uint64, isLocked bool) []byte { return prefix } -func (index *RosettaIndex) PutHypersyncBlockBalances(blockHeight uint64, isLocked bool, balances map[lib.PublicKey]uint64) { +type PubKeyBalance struct { + PublicKey lib.PublicKey + Balance uint64 +} - err := index.db.Update(func(txn *badger.Txn) error { - blockBytes := bytes.NewBuffer([]byte{}) - if err := gob.NewEncoder(blockBytes).Encode(&balances); err != nil { - return err +func (index *RosettaIndex) PutHypersyncBlockBalances(blockHeight uint64, isLocked bool, balances map[lib.PublicKey]uint64) error { + var balanceStructs []PubKeyBalance + for pubKey, balance := range balances { + balanceStructs = append(balanceStructs, PubKeyBalance{pubKey, balance}) + } + chunks := ChunkArray(balanceStructs, 1000) + for _, chunk := range chunks { + err := index.db.Update(func(txn *badger.Txn) error { + for _, pubKeyBalance := range chunk { + if err := txn.Set(hypersyncHeightToToBlockPublicKeyBalance(blockHeight, isLocked, pubKeyBalance.PublicKey), lib.UintToBuf(pubKeyBalance.Balance)); err != nil { + return errors.Wrapf(err, "PutHypersyncBlockBalance: Problem putting balance for pub key: %v", lib.PkToStringBoth(pubKeyBalance.PublicKey.ToBytes())) + } + } + return nil + }) + if err != nil { + return errors.Wrapf(err, "PutHypersyncBlockBalances: Problem putting balances for block %v", blockHeight) } - return txn.Set(hypersyncHeightToBlockKey(blockHeight, isLocked), blockBytes.Bytes()) - }) - if err != nil { - glog.Error(errors.Wrapf(err, "PutHypersyncBlockBalances: Problem putting block: Error:")) } + return nil +} + +func ChunkArray[T any](arr []T, chunkSize int) [][]T { + var chunks [][]T + for i := 0; i < len(arr); i += chunkSize { + end := i + chunkSize + if end > len(arr) { + end = len(arr) + } + chunks = append(chunks, arr[i:end]) + } + return chunks } func (index *RosettaIndex) GetHypersyncBlockBalances(blockHeight uint64) ( @@ -144,31 +161,18 @@ func (index *RosettaIndex) GetHypersyncBlockBalances(blockHeight uint64) ( balances := make(map[lib.PublicKey]uint64) lockedBalances := make(map[lib.PublicKey]uint64) err := index.db.View(func(txn *badger.Txn) error { - itemBalances, err := txn.Get(hypersyncHeightToBlockKey(blockHeight, false)) - if err != nil { - return err + var innerErr error + balances, innerErr = index.GetHypersyncBlockBalanceByLockStatus(txn, blockHeight, false) + if innerErr != nil { + return innerErr } - balancesBytes, err := itemBalances.ValueCopy(nil) - if err != nil { - return err - } - if err := gob.NewDecoder(bytes.NewReader(balancesBytes)).Decode(&balances); err != nil { - return err - } - - itemLocked, err := txn.Get(hypersyncHeightToBlockKey(blockHeight, true)) - if err != nil { - return err - } - lockedBytes, err := itemLocked.ValueCopy(nil) - if err != nil { - return err - } - if err := gob.NewDecoder(bytes.NewReader(lockedBytes)).Decode(&lockedBalances); err != nil { - return err + lockedBalances, innerErr = index.GetHypersyncBlockBalanceByLockStatus(txn, blockHeight, true) + if innerErr != nil { + return innerErr } return nil }) + //TODO: should this function return an error? if err != nil { glog.Error(errors.Wrapf(err, "GetHypersyncBlockBalances: Problem getting block at height (%v)", blockHeight)) } @@ -176,26 +180,63 @@ func (index *RosettaIndex) GetHypersyncBlockBalances(blockHeight uint64) ( return balances, lockedBalances } -func (index *RosettaIndex) PutBalanceSnapshot( - height uint64, isLockedBalance bool, balances map[lib.PublicKey]uint64) error { +func (index *RosettaIndex) GetHypersyncBlockBalanceByLockStatus(txn *badger.Txn, blockHeight uint64, isLocked bool) (map[lib.PublicKey]uint64, error) { + opts := badger.DefaultIteratorOptions + opts.PrefetchValues = true // TODO: what's the right value here? - return index.db.Update(func(txn *badger.Txn) error { - return index.PutBalanceSnapshotWithTxn(txn, height, isLockedBalance, balances) - }) + it := txn.NewIterator(opts) + defer it.Close() + + res := make(map[lib.PublicKey]uint64) + locationInKey := 1 + 8 + 1 // 1 byte for prefix, 8 bytes for block height, 1 byte for isLocked + prefix := hypersyncHeightToBlockPublicKeyBalancePrefix(blockHeight, isLocked) + for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() { + item := it.Item() + key := item.Key() + if len(key) != locationInKey+btcec.PubKeyBytesLenCompressed { + return nil, errors.Errorf("GetHypersyncBlockBalanceByLockStatus: Invalid key length: %v", key) + } + value, err := item.ValueCopy(nil) + if err != nil { + return nil, errors.Wrapf(err, "GetHypersyncBlockBalanceByLockStatus: Problem getting value for key: %v", key) + } + res[*lib.NewPublicKey(key[locationInKey:])] = lib.DecodeUint64(value) // TODO: validation that bytes from key are valid public key. + } + return res, nil } -func (index *RosettaIndex) PutBalanceSnapshotWithTxn( - txn *badger.Txn, height uint64, isLockedBalance bool, balances map[lib.PublicKey]uint64) error { +func (index *RosettaIndex) PutBalanceSnapshot( + height uint64, isLockedBalance bool, balances map[lib.PublicKey]uint64) error { - for pk, bal := range balances { - if err := txn.Set(balanceSnapshotKey(isLockedBalance, &pk, height, bal), []byte{}); err != nil { - return errors.Wrapf(err, "Error in PutBalanceSnapshot for block height: "+ - "%v pub key: %v balance: %v", height, pk, bal) + var balanceStructs []PubKeyBalance + for pubKey, balance := range balances { + balanceStructs = append(balanceStructs, PubKeyBalance{pubKey, balance}) + } + chunks := ChunkArray(balanceStructs, 1000) + for _, chunk := range chunks { + err := index.db.Update(func(txn *badger.Txn) error { + for _, pubKeyBalance := range chunk { + if err := txn.Set(balanceSnapshotKey(isLockedBalance, &pubKeyBalance.PublicKey, height, pubKeyBalance.Balance), []byte{}); err != nil { + return errors.Wrapf(err, "PutBalanceSnapshot: Problem putting balance for pub key: %v", lib.PkToStringBoth(pubKeyBalance.PublicKey.ToBytes())) + } + } + return nil + }) + if err != nil { + return errors.Wrapf(err, "PutBalanceSnapshot: Problem putting balances for block %v", height) } } return nil } +func (index *RosettaIndex) PutSingleBalanceSnapshot( + height uint64, isLockedBalance bool, publicKey lib.PublicKey, balance uint64) error { + + return index.db.Update(func(txn *badger.Txn) error { + return index.PutSingleBalanceSnapshotWithTxn(txn, height, isLockedBalance, publicKey, balance) + }) +} + func (index *RosettaIndex) PutSingleBalanceSnapshotWithTxn( txn *badger.Txn, height uint64, isLockedBalance bool, publicKey lib.PublicKey, balance uint64) error { diff --git a/deso/node.go b/deso/node.go index a7e3f523..91d7f764 100644 --- a/deso/node.go +++ b/deso/node.go @@ -231,10 +231,10 @@ func (node *Node) Start(exitChannels ...*chan os.Signal) { // Setup rosetta index rosettaIndexDir := filepath.Join(node.Config.DataDirectory, "index") - rosettaIndexOpts := lib.PerformanceBadgerOptions(rosettaIndexDir) + rosettaIndexOpts := lib.DefaultBadgerOptions(rosettaIndexDir) rosettaIndexOpts.ValueDir = rosettaIndexDir rosettaIndex, err := badger.Open(rosettaIndexOpts) - node.Index = NewIndex(rosettaIndex) + node.Index = NewIndex(rosettaIndex, node.chainDB) // Listen to transaction and block events so we can fill RosettaIndex with relevant data node.EventManager = lib.NewEventManager() @@ -292,6 +292,7 @@ func (node *Node) Start(exitChannels ...*chan os.Signal) { node.nodeMessageChan, node.Config.ForceChecksum, ) + node.Index.snapshot = node.GetBlockchain().Snapshot() if err != nil { if shouldRestart { glog.Infof(lib.CLog(lib.Red, fmt.Sprintf("Start: Got en error while starting server and shouldRestart "+