Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions app/upgrades.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"fmt"

"github.com/tellor-io/layer/app/upgrades"
v_6_1_3 "github.com/tellor-io/layer/app/upgrades/v6.1.3"
v_6_1_4 "github.com/tellor-io/layer/app/upgrades/v6.1.4"

upgradetypes "cosmossdk.io/x/upgrade/types"
)
Expand All @@ -13,20 +13,20 @@ var (
// `Upgrades` defines the upgrade handlers and store loaders for the application.
// New upgrades should be added to this slice after they are implemented.
Upgrades = []*upgrades.Upgrade{
&v_6_1_3.Upgrade,
&v_6_1_4.Upgrade,
}
Forks = []upgrades.Fork{}
)

// setupUpgradeHandlers registers the upgrade handlers to perform custom upgrade
// logic and state migrations for software upgrades.
func (app *App) setupUpgradeHandlers() {
if app.UpgradeKeeper.HasHandler(v_6_1_3.UpgradeName) {
panic(fmt.Sprintf("Cannot register duplicate upgrade handler '%s'", v_6_1_3.UpgradeName))
if app.UpgradeKeeper.HasHandler(v_6_1_4.UpgradeName) {
panic(fmt.Sprintf("Cannot register duplicate upgrade handler '%s'", v_6_1_4.UpgradeName))
}
app.UpgradeKeeper.SetUpgradeHandler(
v_6_1_3.UpgradeName,
v_6_1_3.CreateUpgradeHandler(
v_6_1_4.UpgradeName,
v_6_1_4.CreateUpgradeHandler(
app.ModuleManager(),
app.configurator,
),
Expand Down
16 changes: 16 additions & 0 deletions app/upgrades/v6.1.4/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package v6_1_4

import (
"github.com/tellor-io/layer/app/upgrades"

store "cosmossdk.io/store/types"
)

const (
UpgradeName = "v6.1.4"
)

var Upgrade = upgrades.Upgrade{
UpgradeName: UpgradeName,
StoreUpgrades: store.StoreUpgrades{},
}
31 changes: 31 additions & 0 deletions app/upgrades/v6.1.4/upgrade.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package v6_1_4

import (
"context"
"fmt"

upgradetypes "cosmossdk.io/x/upgrade/types"

sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/cosmos-sdk/types/module"
)

/*
Upgrade to v6.1.4 includes:
- new ReportByBlock collection in the reporter module
for efficient pruning with early exit on block number ordering.
The old Report collection is deprecated and will drain naturally via
scan capped pruning over 30 days. No data migration needed.
*/

func CreateUpgradeHandler(
mm *module.Manager,
configurator module.Configurator,
) upgradetypes.UpgradeHandler {
return func(ctx context.Context, _ upgradetypes.Plan, vm module.VersionMap) (module.VersionMap, error) {
sdkCtx := sdk.UnwrapSDKContext(ctx)
sdkCtx.Logger().Info(fmt.Sprintf("Running %s Upgrade...", UpgradeName))

return mm.RunMigrations(ctx, configurator, vm)
}
}
2 changes: 1 addition & 1 deletion e2e/upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const (
)

func TestLayerUpgrade(t *testing.T) {
ChainUpgradeTest(t, "layer", "layer", "local", "v6.1.3")
ChainUpgradeTest(t, "layer", "layer", "local", "v6.1.4")
}

func ChainUpgradeTest(t *testing.T, chainName, upgradeContainerRepo, upgradeVersion, upgradeName string) {
Expand Down
21 changes: 21 additions & 0 deletions x/reporter/keeper/indexes.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,24 @@ func newReportIndexes(sb *collections.SchemaBuilder) ReporterBlockNumberIndexes
),
}
}

type ReportByBlockIndexes struct {
BlockNumber *indexes.Multi[uint64, collections.Triple[[]byte, uint64, []byte], types.DelegationsAmounts]
}

func (b ReportByBlockIndexes) IndexesList() []collections.Index[collections.Triple[[]byte, uint64, []byte], types.DelegationsAmounts] {
return []collections.Index[collections.Triple[[]byte, uint64, []byte], types.DelegationsAmounts]{b.BlockNumber}
}

func newReportByBlockIndexes(sb *collections.SchemaBuilder) ReportByBlockIndexes {
return ReportByBlockIndexes{
BlockNumber: indexes.NewMulti(
sb, types.ReportByBlockNumberIndexPrefix, "report_by_block_number_index",
collections.Uint64Key,
collections.TripleKeyCodec(collections.BytesKey, collections.Uint64Key, collections.BytesKey),
func(pk collections.Triple[[]byte, uint64, []byte], _ types.DelegationsAmounts) (uint64, error) {
return pk.K2(), nil // blockNumber
},
),
}
}
110 changes: 89 additions & 21 deletions x/reporter/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type (
DisputedDelegationAmounts collections.Map[[]byte, types.DelegationsAmounts] // key: dispute hashId
FeePaidFromStake collections.Map[[]byte, types.DelegationsAmounts] // key: dispute hashId
Report *collections.IndexedMap[collections.Pair[[]byte, collections.Pair[[]byte, uint64]], types.DelegationsAmounts, ReporterBlockNumberIndexes] // key: queryId, (reporter AccAddress, blockNumber)
ReportByBlock *collections.IndexedMap[collections.Triple[[]byte, uint64, []byte], types.DelegationsAmounts, ReportByBlockIndexes] // key: reporter AccAddress, blockNumber, queryId

// Distribution queue collections
ReporterPeriodData collections.Map[[]byte, types.PeriodRewardData] // key: reporter AccAddress -> current period data
Expand Down Expand Up @@ -85,6 +86,11 @@ func NewKeeper(
sb, types.ReporterPrefix, "report",
collections.PairKeyCodec(collections.BytesKey, collections.PairKeyCodec(collections.BytesKey, collections.Uint64Key)), codec.CollValue[types.DelegationsAmounts](cdc), newReportIndexes(sb),
),
ReportByBlock: collections.NewIndexedMap(
sb, types.ReportByBlockPrefix, "report_by_block",
collections.TripleKeyCodec(collections.BytesKey, collections.Uint64Key, collections.BytesKey),
codec.CollValue[types.DelegationsAmounts](cdc), newReportByBlockIndexes(sb),
),
ReporterPeriodData: collections.NewMap(sb, types.ReporterPeriodDataPrefix, "reporter_period_data", collections.BytesKey, codec.CollValue[types.PeriodRewardData](cdc)),
DistributionQueue: collections.NewMap(sb, types.DistributionQueuePrefix, "distribution_queue", collections.Uint64Key, codec.CollValue[types.DistributionQueueItem](cdc)),
DistributionQueueCounter: collections.NewItem(sb, types.DistributionQueueCounterPrefix, "distribution_queue_counter", codec.CollValue[types.DistributionQueueCounter](cdc)),
Expand Down Expand Up @@ -164,22 +170,32 @@ func (k Keeper) GetDelegationsAmount(ctx context.Context, reporter []byte, block
return delAmounts, err
}

iter, err := k.Report.Indexes.BlockNumber.IterateRaw(ctx, startBuffer, endBuffer, collections.OrderDescending)
// Check new collection first
newIter, err := k.ReportByBlock.IterateRaw(ctx, startBuffer, endBuffer, collections.OrderDescending)
if err != nil {
return delAmounts, err
}
if iter.Valid() {
key, err := iter.Key()
defer newIter.Close()
if newIter.Valid() {
val, err := newIter.Value()
if err != nil {
return delAmounts, err
}
return val, nil
}

rep, err := k.Report.Get(ctx, collections.Join(key.K2(), collections.Join(key.K1().K1(), key.K1().K2())))
// Fallback to old collection only if new collection had nothing
oldIter, err := k.Report.Indexes.BlockNumber.IterateRaw(ctx, startBuffer, endBuffer, collections.OrderDescending)
if err != nil {
return delAmounts, err
}
defer oldIter.Close()
if oldIter.Valid() {
key, err := oldIter.Key()
if err != nil {
return delAmounts, err
}

return rep, nil
return k.Report.Get(ctx, collections.Join(key.K2(), collections.Join(key.K1().K1(), key.K1().K2())))
}
return delAmounts, nil
}
Expand Down Expand Up @@ -233,18 +249,33 @@ func (k Keeper) GetLastReportedAtBlock(ctx context.Context, reporter []byte) (ui
endBuf := make([]byte, pc.Size(end))
_, _ = pc.Encode(startBuf, start)
_, _ = pc.Encode(endBuf, end)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we check for err here ? Seems like nbd but we do in GetDelegationsAmount

iter, err := k.Report.Indexes.BlockNumber.IterateRaw(ctx, startBuf, endBuf, collections.OrderDescending)

// Check new collection first
newIter, err := k.ReportByBlock.IterateRaw(ctx, startBuf, endBuf, collections.OrderDescending)
if err != nil {
return 0, err
}
defer iter.Close()
if iter.Valid() {
key, err := iter.Key()
defer newIter.Close()
if newIter.Valid() {
key, err := newIter.Key()
if err != nil {
return 0, err
}
blockNumber := key.K1().K2()
return blockNumber, nil
return key.K2(), nil
}

// Fallback to old collection only if new collection had nothing
oldIter, err := k.Report.Indexes.BlockNumber.IterateRaw(ctx, startBuf, endBuf, collections.OrderDescending)
if err != nil {
return 0, err
}
defer oldIter.Close()
if oldIter.Valid() {
key, err := oldIter.Key()
if err != nil {
return 0, err
}
return key.K1().K2(), nil
}
return 0, nil
}
Expand All @@ -265,29 +296,66 @@ func (k Keeper) PruneOldReports(ctx context.Context, maxBatchSize int) error {
return nil
}

type reportKey = collections.Pair[[]byte, collections.Pair[[]byte, uint64]]
var toDelete []reportKey
totalDeleted := 0

// Prune old collection first
type oldKey = collections.Pair[[]byte, collections.Pair[[]byte, uint64]]
var oldToDelete []oldKey
oldScanned := 0

iter, err := k.Report.Iterate(ctx, nil)
oldIter, err := k.Report.Iterate(ctx, nil)
if err != nil {
return err
}
defer iter.Close()

for ; iter.Valid() && len(toDelete) < maxBatchSize; iter.Next() {
pk, err := iter.Key()
defer oldIter.Close()
for ; oldIter.Valid() && oldScanned < maxBatchSize && len(oldToDelete) < maxBatchSize; oldIter.Next() {
oldScanned++
pk, err := oldIter.Key()
if err != nil {
return err
}
if pk.K2().K2() < cutoffBlock {
toDelete = append(toDelete, pk)
oldToDelete = append(oldToDelete, pk)
}
}

for _, pk := range toDelete {
for _, pk := range oldToDelete {
if err := k.Report.Remove(ctx, pk); err != nil {
return err
}
totalDeleted++
}

// Iterate from lowest blockNumber, break at cutoff
remaining := maxBatchSize - totalDeleted
if remaining > 0 {
type newKey = collections.Triple[[]byte, uint64, []byte]
var newToDelete []newKey
newScanned := 0
newIter, err := k.ReportByBlock.Indexes.BlockNumber.Iterate(ctx, nil)
if err != nil {
return err
}
defer newIter.Close()
for ; newIter.Valid() && len(newToDelete) < remaining; newIter.Next() {
pk, err := newIter.PrimaryKey()
if err != nil {
return err
}

if pk.K2() >= cutoffBlock {
break
}
newToDelete = append(newToDelete, pk)
newScanned++
}

for _, pk := range newToDelete {
if err := k.ReportByBlock.Remove(ctx, pk); err != nil {
return err
}
totalDeleted++
}
}

return nil
Expand Down
Loading
Loading