diff --git a/app/upgrades.go b/app/upgrades.go index b3a0e2ed6..0a3132641 100644 --- a/app/upgrades.go +++ b/app/upgrades.go @@ -4,7 +4,7 @@ import ( "fmt" "github.com/tellor-io/layer/app/upgrades" - v_6_1_3 "github.com/tellor-io/layer/app/upgrades/v6.1.3" + v_6_1_4 "github.com/tellor-io/layer/app/upgrades/v6.1.4" upgradetypes "cosmossdk.io/x/upgrade/types" ) @@ -13,7 +13,7 @@ var ( // `Upgrades` defines the upgrade handlers and store loaders for the application. // New upgrades should be added to this slice after they are implemented. Upgrades = []*upgrades.Upgrade{ - &v_6_1_3.Upgrade, + &v_6_1_4.Upgrade, } Forks = []upgrades.Fork{} ) @@ -21,12 +21,12 @@ var ( // setupUpgradeHandlers registers the upgrade handlers to perform custom upgrade // logic and state migrations for software upgrades. func (app *App) setupUpgradeHandlers() { - if app.UpgradeKeeper.HasHandler(v_6_1_3.UpgradeName) { - panic(fmt.Sprintf("Cannot register duplicate upgrade handler '%s'", v_6_1_3.UpgradeName)) + if app.UpgradeKeeper.HasHandler(v_6_1_4.UpgradeName) { + panic(fmt.Sprintf("Cannot register duplicate upgrade handler '%s'", v_6_1_4.UpgradeName)) } app.UpgradeKeeper.SetUpgradeHandler( - v_6_1_3.UpgradeName, - v_6_1_3.CreateUpgradeHandler( + v_6_1_4.UpgradeName, + v_6_1_4.CreateUpgradeHandler( app.ModuleManager(), app.configurator, ), diff --git a/app/upgrades/v6.1.4/constants.go b/app/upgrades/v6.1.4/constants.go new file mode 100644 index 000000000..11215a8a2 --- /dev/null +++ b/app/upgrades/v6.1.4/constants.go @@ -0,0 +1,16 @@ +package v6_1_4 + +import ( + "github.com/tellor-io/layer/app/upgrades" + + store "cosmossdk.io/store/types" +) + +const ( + UpgradeName = "v6.1.4" +) + +var Upgrade = upgrades.Upgrade{ + UpgradeName: UpgradeName, + StoreUpgrades: store.StoreUpgrades{}, +} diff --git a/app/upgrades/v6.1.4/upgrade.go b/app/upgrades/v6.1.4/upgrade.go new file mode 100644 index 000000000..922b965cf --- /dev/null +++ b/app/upgrades/v6.1.4/upgrade.go @@ -0,0 +1,31 @@ +package v6_1_4 + +import ( + "context" + "fmt" + + upgradetypes "cosmossdk.io/x/upgrade/types" + + sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/cosmos/cosmos-sdk/types/module" +) + +/* +Upgrade to v6.1.4 includes: + - new ReportByBlock collection in the reporter module + for efficient pruning with early exit on block number ordering. + The old Report collection is deprecated and will drain naturally via + scan capped pruning over 30 days. No data migration needed. +*/ + +func CreateUpgradeHandler( + mm *module.Manager, + configurator module.Configurator, +) upgradetypes.UpgradeHandler { + return func(ctx context.Context, _ upgradetypes.Plan, vm module.VersionMap) (module.VersionMap, error) { + sdkCtx := sdk.UnwrapSDKContext(ctx) + sdkCtx.Logger().Info(fmt.Sprintf("Running %s Upgrade...", UpgradeName)) + + return mm.RunMigrations(ctx, configurator, vm) + } +} diff --git a/e2e/upgrade_test.go b/e2e/upgrade_test.go index e73bef2ed..129dad3d7 100644 --- a/e2e/upgrade_test.go +++ b/e2e/upgrade_test.go @@ -28,7 +28,7 @@ const ( ) func TestLayerUpgrade(t *testing.T) { - ChainUpgradeTest(t, "layer", "layer", "local", "v6.1.3") + ChainUpgradeTest(t, "layer", "layer", "local", "v6.1.4") } func ChainUpgradeTest(t *testing.T, chainName, upgradeContainerRepo, upgradeVersion, upgradeName string) { diff --git a/x/reporter/keeper/indexes.go b/x/reporter/keeper/indexes.go index 19ba23b51..20ea0c34a 100644 --- a/x/reporter/keeper/indexes.go +++ b/x/reporter/keeper/indexes.go @@ -44,3 +44,24 @@ func newReportIndexes(sb *collections.SchemaBuilder) ReporterBlockNumberIndexes ), } } + +type ReportByBlockIndexes struct { + BlockNumber *indexes.Multi[uint64, collections.Triple[[]byte, uint64, []byte], types.DelegationsAmounts] +} + +func (b ReportByBlockIndexes) IndexesList() []collections.Index[collections.Triple[[]byte, uint64, []byte], types.DelegationsAmounts] { + return []collections.Index[collections.Triple[[]byte, uint64, []byte], types.DelegationsAmounts]{b.BlockNumber} +} + +func newReportByBlockIndexes(sb *collections.SchemaBuilder) ReportByBlockIndexes { + return ReportByBlockIndexes{ + BlockNumber: indexes.NewMulti( + sb, types.ReportByBlockNumberIndexPrefix, "report_by_block_number_index", + collections.Uint64Key, + collections.TripleKeyCodec(collections.BytesKey, collections.Uint64Key, collections.BytesKey), + func(pk collections.Triple[[]byte, uint64, []byte], _ types.DelegationsAmounts) (uint64, error) { + return pk.K2(), nil // blockNumber + }, + ), + } +} diff --git a/x/reporter/keeper/keeper.go b/x/reporter/keeper/keeper.go index 49954f8b6..a0e13727c 100644 --- a/x/reporter/keeper/keeper.go +++ b/x/reporter/keeper/keeper.go @@ -30,6 +30,7 @@ type ( DisputedDelegationAmounts collections.Map[[]byte, types.DelegationsAmounts] // key: dispute hashId FeePaidFromStake collections.Map[[]byte, types.DelegationsAmounts] // key: dispute hashId Report *collections.IndexedMap[collections.Pair[[]byte, collections.Pair[[]byte, uint64]], types.DelegationsAmounts, ReporterBlockNumberIndexes] // key: queryId, (reporter AccAddress, blockNumber) + ReportByBlock *collections.IndexedMap[collections.Triple[[]byte, uint64, []byte], types.DelegationsAmounts, ReportByBlockIndexes] // key: reporter AccAddress, blockNumber, queryId // Distribution queue collections ReporterPeriodData collections.Map[[]byte, types.PeriodRewardData] // key: reporter AccAddress -> current period data @@ -85,6 +86,11 @@ func NewKeeper( sb, types.ReporterPrefix, "report", collections.PairKeyCodec(collections.BytesKey, collections.PairKeyCodec(collections.BytesKey, collections.Uint64Key)), codec.CollValue[types.DelegationsAmounts](cdc), newReportIndexes(sb), ), + ReportByBlock: collections.NewIndexedMap( + sb, types.ReportByBlockPrefix, "report_by_block", + collections.TripleKeyCodec(collections.BytesKey, collections.Uint64Key, collections.BytesKey), + codec.CollValue[types.DelegationsAmounts](cdc), newReportByBlockIndexes(sb), + ), ReporterPeriodData: collections.NewMap(sb, types.ReporterPeriodDataPrefix, "reporter_period_data", collections.BytesKey, codec.CollValue[types.PeriodRewardData](cdc)), DistributionQueue: collections.NewMap(sb, types.DistributionQueuePrefix, "distribution_queue", collections.Uint64Key, codec.CollValue[types.DistributionQueueItem](cdc)), DistributionQueueCounter: collections.NewItem(sb, types.DistributionQueueCounterPrefix, "distribution_queue_counter", codec.CollValue[types.DistributionQueueCounter](cdc)), @@ -164,22 +170,32 @@ func (k Keeper) GetDelegationsAmount(ctx context.Context, reporter []byte, block return delAmounts, err } - iter, err := k.Report.Indexes.BlockNumber.IterateRaw(ctx, startBuffer, endBuffer, collections.OrderDescending) + // Check new collection first + newIter, err := k.ReportByBlock.IterateRaw(ctx, startBuffer, endBuffer, collections.OrderDescending) if err != nil { return delAmounts, err } - if iter.Valid() { - key, err := iter.Key() + defer newIter.Close() + if newIter.Valid() { + val, err := newIter.Value() if err != nil { return delAmounts, err } + return val, nil + } - rep, err := k.Report.Get(ctx, collections.Join(key.K2(), collections.Join(key.K1().K1(), key.K1().K2()))) + // Fallback to old collection only if new collection had nothing + oldIter, err := k.Report.Indexes.BlockNumber.IterateRaw(ctx, startBuffer, endBuffer, collections.OrderDescending) + if err != nil { + return delAmounts, err + } + defer oldIter.Close() + if oldIter.Valid() { + key, err := oldIter.Key() if err != nil { return delAmounts, err } - - return rep, nil + return k.Report.Get(ctx, collections.Join(key.K2(), collections.Join(key.K1().K1(), key.K1().K2()))) } return delAmounts, nil } @@ -233,18 +249,33 @@ func (k Keeper) GetLastReportedAtBlock(ctx context.Context, reporter []byte) (ui endBuf := make([]byte, pc.Size(end)) _, _ = pc.Encode(startBuf, start) _, _ = pc.Encode(endBuf, end) - iter, err := k.Report.Indexes.BlockNumber.IterateRaw(ctx, startBuf, endBuf, collections.OrderDescending) + + // Check new collection first + newIter, err := k.ReportByBlock.IterateRaw(ctx, startBuf, endBuf, collections.OrderDescending) if err != nil { return 0, err } - defer iter.Close() - if iter.Valid() { - key, err := iter.Key() + defer newIter.Close() + if newIter.Valid() { + key, err := newIter.Key() if err != nil { return 0, err } - blockNumber := key.K1().K2() - return blockNumber, nil + return key.K2(), nil + } + + // Fallback to old collection only if new collection had nothing + oldIter, err := k.Report.Indexes.BlockNumber.IterateRaw(ctx, startBuf, endBuf, collections.OrderDescending) + if err != nil { + return 0, err + } + defer oldIter.Close() + if oldIter.Valid() { + key, err := oldIter.Key() + if err != nil { + return 0, err + } + return key.K1().K2(), nil } return 0, nil } @@ -265,29 +296,66 @@ func (k Keeper) PruneOldReports(ctx context.Context, maxBatchSize int) error { return nil } - type reportKey = collections.Pair[[]byte, collections.Pair[[]byte, uint64]] - var toDelete []reportKey + totalDeleted := 0 + + // Prune old collection first + type oldKey = collections.Pair[[]byte, collections.Pair[[]byte, uint64]] + var oldToDelete []oldKey + oldScanned := 0 - iter, err := k.Report.Iterate(ctx, nil) + oldIter, err := k.Report.Iterate(ctx, nil) if err != nil { return err } - defer iter.Close() - - for ; iter.Valid() && len(toDelete) < maxBatchSize; iter.Next() { - pk, err := iter.Key() + defer oldIter.Close() + for ; oldIter.Valid() && oldScanned < maxBatchSize && len(oldToDelete) < maxBatchSize; oldIter.Next() { + oldScanned++ + pk, err := oldIter.Key() if err != nil { return err } if pk.K2().K2() < cutoffBlock { - toDelete = append(toDelete, pk) + oldToDelete = append(oldToDelete, pk) } } - for _, pk := range toDelete { + for _, pk := range oldToDelete { if err := k.Report.Remove(ctx, pk); err != nil { return err } + totalDeleted++ + } + + // Iterate from lowest blockNumber, break at cutoff + remaining := maxBatchSize - totalDeleted + if remaining > 0 { + type newKey = collections.Triple[[]byte, uint64, []byte] + var newToDelete []newKey + newScanned := 0 + newIter, err := k.ReportByBlock.Indexes.BlockNumber.Iterate(ctx, nil) + if err != nil { + return err + } + defer newIter.Close() + for ; newIter.Valid() && len(newToDelete) < remaining; newIter.Next() { + pk, err := newIter.PrimaryKey() + if err != nil { + return err + } + + if pk.K2() >= cutoffBlock { + break + } + newToDelete = append(newToDelete, pk) + newScanned++ + } + + for _, pk := range newToDelete { + if err := k.ReportByBlock.Remove(ctx, pk); err != nil { + return err + } + totalDeleted++ + } } return nil diff --git a/x/reporter/keeper/keeper_test.go b/x/reporter/keeper/keeper_test.go index 6786bf964..e772c12ab 100644 --- a/x/reporter/keeper/keeper_test.go +++ b/x/reporter/keeper/keeper_test.go @@ -224,6 +224,62 @@ func TestGetDelegationsAmount(t *testing.T) { require.Equal(t, math.NewInt(15000), total) } +func TestTransitionPathsPreferNewCollection(t *testing.T) { + k, _, _, _, _, ctx, _ := setupKeeper(t) + reporter := sample.AccAddressBytes() + ctx = ctx.WithBlockHeight(10) + + // Legacy snapshot exists. + require.NoError(t, k.Report.Set( + ctx, + collections.Join([]byte("legacy-query"), collections.Join(reporter.Bytes(), uint64(4))), + types.DelegationsAmounts{Total: math.NewInt(100)}, + )) + + // New snapshot at a later eligible block should be preferred. + require.NoError(t, k.ReportByBlock.Set( + ctx, + collections.Join3(reporter.Bytes(), uint64(6), []byte("new-query")), + types.DelegationsAmounts{Total: math.NewInt(250)}, + )) + + got, err := k.GetDelegationsAmount(ctx, reporter, 10) + require.NoError(t, err) + require.Equal(t, math.NewInt(250), got.Total) + + last, err := k.GetLastReportedAtBlock(ctx, reporter) + require.NoError(t, err) + require.Equal(t, uint64(6), last) +} + +func TestTransitionPathsFallbackToOldCollection(t *testing.T) { + k, _, _, _, _, ctx, _ := setupKeeper(t) + reporter := sample.AccAddressBytes() + ctx = ctx.WithBlockHeight(5) + + // Legacy snapshot should still be served when new collection has no eligible entry. + require.NoError(t, k.Report.Set( + ctx, + collections.Join([]byte("legacy-query"), collections.Join(reporter.Bytes(), uint64(4))), + types.DelegationsAmounts{Total: math.NewInt(123)}, + )) + + // New snapshot exists but is outside the requested/current block window. + require.NoError(t, k.ReportByBlock.Set( + ctx, + collections.Join3(reporter.Bytes(), uint64(8), []byte("new-query")), + types.DelegationsAmounts{Total: math.NewInt(999)}, + )) + + got, err := k.GetDelegationsAmount(ctx, reporter, 5) + require.NoError(t, err) + require.Equal(t, math.NewInt(123), got.Total) + + last, err := k.GetLastReportedAtBlock(ctx, reporter) + require.NoError(t, err) + require.Equal(t, uint64(4), last) +} + // called in endblocker func BenchmarkReporterTrackStakeChange(b *testing.B) { k, sk, _, _, _, ctx, _ := setupKeeper(b) @@ -343,6 +399,65 @@ func TestPruneOldReportsMaxIterations(t *testing.T) { require.Equal(t, 2, count) } +func TestPruneNewCollectionBlockOrder(t *testing.T) { + k, _, _, _, _, ctx, _ := setupKeeper(t) + reporter1 := sample.AccAddressBytes() + reporter2 := sample.AccAddressBytes() + + now := time.Now() + ctx = ctx.WithBlockTime(now).WithBlockHeight(500) + + require.NoError(t, k.ReportByBlock.Set(ctx, collections.Join3(reporter1.Bytes(), uint64(50), []byte("q1")), types.DelegationsAmounts{Total: math.OneInt()})) + require.NoError(t, k.ReportByBlock.Set(ctx, collections.Join3(reporter2.Bytes(), uint64(300), []byte("q1")), types.DelegationsAmounts{Total: math.OneInt()})) + require.NoError(t, k.ReportByBlock.Set(ctx, collections.Join3(reporter1.Bytes(), uint64(10), []byte("q1")), types.DelegationsAmounts{Total: math.OneInt()})) + require.NoError(t, k.ReportByBlock.Set(ctx, collections.Join3(reporter2.Bytes(), uint64(200), []byte("q1")), types.DelegationsAmounts{Total: math.OneInt()})) + require.NoError(t, k.ReportByBlock.Set(ctx, collections.Join3(reporter1.Bytes(), uint64(400), []byte("q1")), types.DelegationsAmounts{Total: math.OneInt()})) + + // Verify Multi index iterates in ascending blockNumber order + iter, err := k.ReportByBlock.Indexes.BlockNumber.Iterate(ctx, nil) + require.NoError(t, err) + defer iter.Close() + + var blockNumbers []uint64 + for ; iter.Valid(); iter.Next() { + pk, err := iter.PrimaryKey() + require.NoError(t, err) + blockNumbers = append(blockNumbers, pk.K2()) + } + require.Equal(t, []uint64{10, 50, 200, 300, 400}, blockNumbers) + + // Now prune with cutoff at 250 — should delete blocks 10, 50, 200 and stop + oracleKeeper := mocks.NewOracleKeeper(t) + oracleKeeper.On("GetBlockHeightFromTimestamp", mock.Anything, mock.Anything).Return(uint64(250), nil) + k.SetOracleKeeper(oracleKeeper) + + err = k.PruneOldReports(ctx, 100) + require.NoError(t, err) + + // Verify only blocks 300 and 400 remain + remaining := 0 + err = k.ReportByBlock.Walk(ctx, nil, func(_ collections.Triple[[]byte, uint64, []byte], _ types.DelegationsAmounts) (bool, error) { + remaining++ + return false, nil + }) + require.NoError(t, err) + require.Equal(t, 2, remaining) + + // Verify the correct entries survived + _, err = k.ReportByBlock.Get(ctx, collections.Join3(reporter2.Bytes(), uint64(300), []byte("q1"))) + require.NoError(t, err) + _, err = k.ReportByBlock.Get(ctx, collections.Join3(reporter1.Bytes(), uint64(400), []byte("q1"))) + require.NoError(t, err) + + // Verify old entries are gone + _, err = k.ReportByBlock.Get(ctx, collections.Join3(reporter1.Bytes(), uint64(10), []byte("q1"))) + require.Error(t, err) + _, err = k.ReportByBlock.Get(ctx, collections.Join3(reporter1.Bytes(), uint64(50), []byte("q1"))) + require.Error(t, err) + _, err = k.ReportByBlock.Get(ctx, collections.Join3(reporter2.Bytes(), uint64(200), []byte("q1"))) + require.Error(t, err) +} + func TestStakeRecalcFlag(t *testing.T) { k, _, _, _, _, ctx, _ := setupKeeper(t) reporter := sample.AccAddressBytes() diff --git a/x/reporter/keeper/reporter.go b/x/reporter/keeper/reporter.go index d1da14332..13e65ba0f 100644 --- a/x/reporter/keeper/reporter.go +++ b/x/reporter/keeper/reporter.go @@ -86,7 +86,7 @@ func (k Keeper) ReporterStake(ctx context.Context, repAddr sdk.AccAddress, query } if changed { // Store per-report snapshot for disputes - err = k.Report.Set(ctx, collections.Join(queryId, collections.Join(repAddr.Bytes(), uint64(sdk.UnwrapSDKContext(ctx).BlockHeight()))), types.DelegationsAmounts{TokenOrigins: delegates, Total: totalTokens}) + err = k.ReportByBlock.Set(ctx, collections.Join3(repAddr.Bytes(), uint64(sdk.UnwrapSDKContext(ctx).BlockHeight()), queryId), types.DelegationsAmounts{TokenOrigins: delegates, Total: totalTokens}) if err != nil { return math.Int{}, err } @@ -330,7 +330,7 @@ func (k Keeper) GetReporterStake(ctx context.Context, repAddr sdk.AccAddress) (m // Stores the token origins for each selector which is needed during a dispute for slashing/returning tokens to appropriate parties func (k Keeper) SetReporterStakeByQueryId(ctx context.Context, repAddr sdk.AccAddress, delegates []*types.TokenOriginInfo, totalTokens math.Int, queryId []byte) error { - return k.Report.Set(ctx, collections.Join(queryId, collections.Join(repAddr.Bytes(), uint64(sdk.UnwrapSDKContext(ctx).BlockHeight()))), types.DelegationsAmounts{TokenOrigins: delegates, Total: totalTokens}) + return k.ReportByBlock.Set(ctx, collections.Join3(repAddr.Bytes(), uint64(sdk.UnwrapSDKContext(ctx).BlockHeight()), queryId), types.DelegationsAmounts{TokenOrigins: delegates, Total: totalTokens}) } // handlePeriodTracking manages reward period tracking for a reporter. diff --git a/x/reporter/types/keys.go b/x/reporter/types/keys.go index a4df0e57a..4b0bba328 100644 --- a/x/reporter/types/keys.go +++ b/x/reporter/types/keys.go @@ -36,4 +36,6 @@ var ( LastValSetUpdateHeightPrefix = collections.NewPrefix(25) StakeRecalcFlagPrefix = collections.NewPrefix(26) RecalcAtTimePrefix = collections.NewPrefix(27) + ReportByBlockPrefix = collections.NewPrefix(28) + ReportByBlockNumberIndexPrefix = collections.NewPrefix(29) )