diff --git a/beacon/blockchain/interfaces.go b/beacon/blockchain/interfaces.go index 2dbd174b68..6604fb33c8 100644 --- a/beacon/blockchain/interfaces.go +++ b/beacon/blockchain/interfaces.go @@ -163,6 +163,7 @@ type BlockchainI interface { sdk.Context, *ctypes.BeaconBlock, ) error + PruneOrphanedBlobs(lastBlockHeight int64) error } // BlobProcessor is the interface for the blobs processor. diff --git a/beacon/blockchain/service.go b/beacon/blockchain/service.go index 8d5416b9fe..64ae2d503d 100644 --- a/beacon/blockchain/service.go +++ b/beacon/blockchain/service.go @@ -22,6 +22,7 @@ package blockchain import ( "context" + "fmt" "sync" "sync/atomic" @@ -127,3 +128,34 @@ func (s *Service) Stop() error { func (s *Service) StorageBackend() StorageBackend { return s.storageBackend } + +// PruneOrphanedBlobs removes any orphaned blob sidecars that may exist from incomplete block finalization. +func (s *Service) PruneOrphanedBlobs(lastBlockHeight int64) error { + orphanedSlot := math.Slot(lastBlockHeight + 1) // #nosec G115 + + // Check if any blob sidecars exist at the potentially orphaned slot + sidecars, err := s.storageBackend.AvailabilityStore().GetBlobSidecars(orphanedSlot) + if err != nil { + return fmt.Errorf("failed to read blob sidecars at slot %d: %w", orphanedSlot, err) + } + + // If no sidecars exist at this slot, nothing to clean up + if len(sidecars) == 0 { + return nil + } + + // Sidecars exist at this slot - they are orphaned, so delete them + s.logger.Warn("Found orphaned blob sidecars from incomplete block finalization, removing", + "slot", orphanedSlot.Base10(), + "num_sidecars", len(sidecars), + ) + + err = s.storageBackend.AvailabilityStore().DeleteBlobSidecars(orphanedSlot) + if err != nil { + return fmt.Errorf("failed to delete orphaned sidecars at slot %d: %w", orphanedSlot, err) + } + + s.logger.Info("Successfully removed orphaned blob sidecars", "slot", orphanedSlot.Base10()) + + return nil +} diff --git a/consensus/cometbft/service/service.go b/consensus/cometbft/service/service.go index de275a0c1a..dbb21e95ae 100644 --- a/consensus/cometbft/service/service.go +++ b/consensus/cometbft/service/service.go @@ -182,6 +182,11 @@ func NewService( } } + // Clean up any orphaned blob sidecars from incomplete block finalization. + if err = s.Blockchain.PruneOrphanedBlobs(lastBlockHeight); err != nil { + panic(fmt.Errorf("failed pruning orphaned blobs: %w", err)) + } + return s } diff --git a/da/store/interfaces.go b/da/store/interfaces.go index 7f52cb0c50..7cc9731678 100644 --- a/da/store/interfaces.go +++ b/da/store/interfaces.go @@ -34,4 +34,7 @@ type IndexDB interface { // exist in the DB for any reason (pruned, invalid index), an empty list is // returned with no error. GetByIndex(index uint64) ([][]byte, error) + + // DeleteByIndex removes all entries at the specified index + DeleteByIndex(index uint64) error } diff --git a/da/store/store.go b/da/store/store.go index b4f7470b01..df177720cb 100644 --- a/da/store/store.go +++ b/da/store/store.go @@ -114,3 +114,8 @@ func (s *Store) Persist(sidecars types.BlobSidecars) error { ) return nil } + +// DeleteBlobSidecars removes all blob sidecars for the specified slot. +func (s *Store) DeleteBlobSidecars(slot math.Slot) error { + return s.IndexDB.DeleteByIndex(slot.Unwrap()) +} diff --git a/storage/filedb/range_db.go b/storage/filedb/range_db.go index 976382d3f8..7cb17a7d4e 100644 --- a/storage/filedb/range_db.go +++ b/storage/filedb/range_db.go @@ -153,6 +153,20 @@ func (db *RangeDB) Prune(start, end uint64) error { return err } +// DeleteByIndex removes all entries at the specified index. +func (db *RangeDB) DeleteByIndex(index uint64) error { + db.rwMu.Lock() + defer db.rwMu.Unlock() + + path := fmt.Sprintf(pathFormat, index) + if err := db.coreDB.fs.RemoveAll(path); err != nil && !os.IsNotExist(err) { + return fmt.Errorf("RangeDB DeleteByIndex failed to remove index %d: %w", index, err) + } + + // Note: We intentionally do not update lowerBoundIndex here. + return nil +} + // GetByIndex takes the database index and returns all associated entries, // expecting database keys to follow the prefix() format. If index does not // exist in the DB for any reason (pruned, invalid index), an empty list is diff --git a/storage/filedb/range_db_test.go b/storage/filedb/range_db_test.go index ed1431d681..efb7062cbf 100644 --- a/storage/filedb/range_db_test.go +++ b/storage/filedb/range_db_test.go @@ -354,6 +354,40 @@ func TestRangeDB_Invariants(t *testing.T) { } } +// =========================== DELETE BY INDEX ================================ + +// TestRangeDB_DeleteByIndex_DoesNotAffectLowerBound verifies the critical +// invariant that DeleteByIndex does not modify lowerBoundIndex, unlike Prune. +func TestRangeDB_DeleteByIndex_DoesNotAffectLowerBound(t *testing.T) { + t.Parallel() + rdb := file.NewRangeDB(newTestFDB("/tmp/testdb-deletebyindex")) + + // Populate indexes 1-10 + require.NoError(t, populateTestDB(rdb, 1, 10)) + + // Prune indexes 1-5, which sets lowerBoundIndex to 5 + require.NoError(t, rdb.Prune(1, 5)) + lowerBoundBefore := getFirstNonNilIndex(rdb) + require.Equal(t, uint64(5), lowerBoundBefore, "lowerBoundIndex should be 5 after pruning") + + // Delete index 7 using DeleteByIndex + require.NoError(t, rdb.DeleteByIndex(7)) + exists, err := rdb.Has(7, []byte("key")) + require.NoError(t, err) + require.False(t, exists, "index 7 should be deleted") + + // Verify lowerBoundIndex was NOT changed + lowerBoundAfter := getFirstNonNilIndex(rdb) + require.Equal(t, lowerBoundBefore, lowerBoundAfter, "DeleteByIndex must not modify lowerBoundIndex") + + // Prune again to verify DeleteByIndex didn't break the pruning mechanism + require.NoError(t, rdb.Prune(5, 8)) + + // Verify the second prune worked correctly + lowerBoundAfter = getFirstNonNilIndex(rdb) + require.Equal(t, uint64(8), lowerBoundAfter, "second Prune should update lowerBoundIndex to 8") +} + // =============================== HELPERS ================================== // newTestFDB returns a new file DB instance with an in-memory filesystem. diff --git a/testing/simulated/orphaned_blobs_test.go b/testing/simulated/orphaned_blobs_test.go new file mode 100644 index 0000000000..e4e2b999ba --- /dev/null +++ b/testing/simulated/orphaned_blobs_test.go @@ -0,0 +1,103 @@ +//go:build simulated + +// SPDX-License-Identifier: BUSL-1.1 +// +// Copyright (C) 2025, Berachain Foundation. All rights reserved. +// Use of this software is governed by the Business Source License included +// in the LICENSE file of this repository and at www.mariadb.com/bsl11. +// +// ANY USE OF THE LICENSED WORK IN VIOLATION OF THIS LICENSE WILL AUTOMATICALLY +// TERMINATE YOUR RIGHTS UNDER THIS LICENSE FOR THE CURRENT AND ALL OTHER +// VERSIONS OF THE LICENSED WORK. +// +// THIS LICENSE DOES NOT GRANT YOU ANY RIGHT IN ANY TRADEMARK OR LOGO OF +// LICENSOR OR ITS AFFILIATES (PROVIDED THAT YOU MAY USE A TRADEMARK OR LOGO OF +// LICENSOR AS EXPRESSLY REQUIRED BY THIS LICENSE). +// +// TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON +// AN "AS IS" BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS, +// EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND +// TITLE. + +package simulated_test + +import ( + "time" + + ctypes "github.com/berachain/beacon-kit/consensus-types/types" + "github.com/berachain/beacon-kit/da/kzg" + datypes "github.com/berachain/beacon-kit/da/types" + "github.com/berachain/beacon-kit/primitives/common" + "github.com/berachain/beacon-kit/primitives/crypto" + "github.com/berachain/beacon-kit/primitives/eip4844" + "github.com/berachain/beacon-kit/primitives/math" + "github.com/berachain/beacon-kit/testing/simulated" + "github.com/stretchr/testify/require" +) + +// TestOrphanedBlobCleanup tests that orphaned blob sidecars are properly cleaned up on node restart. +// This simulates the scenario where sidecars are saved to disk but the block finalization fails. +func (s *SimulatedSuite) TestOrphanedBlobCleanup() { + // Initialize chain and move forward two blocks. + s.InitializeChain(s.T()) + nodeAddress, err := s.SimComet.GetNodeAddress() + s.Require().NoError(err) + s.SimComet.Comet.SetNodeAddress(nodeAddress) + + _, _, proposalTime := s.MoveChainToHeight(s.T(), 1, 2, nodeAddress, time.Now()) + + // Get the last committed block height. + lastBlockHeight := s.SimComet.Comet.CommitMultiStore().LastCommitID().Version + orphanedSlot := math.Slot(lastBlockHeight + 1) + + // Create and persist orphaned blob sidecars. + // This simulates FinalizeSidecars succeeding but finalizeBeaconBlock failing. + orphanedSidecars := createOrphanedSidecars(s.T(), orphanedSlot, s.TestNode.KZGVerifier) + err = s.TestNode.StorageBackend.AvailabilityStore().Persist(orphanedSidecars) + s.Require().NoError(err) + + // Verify orphaned blobs exist. + sidecars, err := s.TestNode.StorageBackend.AvailabilityStore().GetBlobSidecars(orphanedSlot) + s.Require().NoError(err) + s.Require().Len(sidecars, 1) + + // Simulate node restart by calling PruneOrphanedBlobs. + err = s.TestNode.Blockchain.PruneOrphanedBlobs(lastBlockHeight) + s.Require().NoError(err) + + // Verify orphaned blobs were cleaned up. + sidecars, err = s.TestNode.StorageBackend.AvailabilityStore().GetBlobSidecars(orphanedSlot) + s.Require().NoError(err) + s.Require().Empty(sidecars) + + // Verify chain continues normally. + proposals, _, _ := s.MoveChainToHeight(s.T(), 3, 1, nodeAddress, proposalTime) + s.Require().Len(proposals, 1) +} + +// createOrphanedSidecars creates fake blob sidecars for testing orphaned blob cleanup. +func createOrphanedSidecars( + t require.TestingT, + slot math.Slot, + verifier kzg.BlobProofVerifier, +) datypes.BlobSidecars { + blobs := []*eip4844.Blob{{1, 2, 3}} + proofs, commitments := simulated.GetProofAndCommitmentsForBlobs(require.New(t), blobs, verifier) + + sidecars := make(datypes.BlobSidecars, len(blobs)) + for i := range blobs { + sidecars[i] = datypes.BuildBlobSidecar( + math.U64(i), + &ctypes.SignedBeaconBlockHeader{ + Header: &ctypes.BeaconBlockHeader{Slot: slot}, + Signature: crypto.BLSSignature{}, + }, + blobs[i], + commitments[i], + proofs[i], + make([]common.Root, ctypes.KZGInclusionProofDepth), + ) + } + return sidecars +} diff --git a/testing/simulated/testnode.go b/testing/simulated/testnode.go index 8c14f24629..2993bc0e8c 100644 --- a/testing/simulated/testnode.go +++ b/testing/simulated/testnode.go @@ -70,6 +70,7 @@ type ValidatorAPI interface { type TestNode struct { nodetypes.Node StorageBackend blockchain.StorageBackend + Blockchain *blockchain.Service ChainSpec chain.Spec APIBackend ValidatorAPI SimComet *SimComet @@ -128,6 +129,7 @@ func buildNode( simComet *SimComet config *config.Config storageBackend blockchain.StorageBackend + blockchain *blockchain.Service chainSpec chain.Spec engineClient *client.EngineClient stateProcessor *core.StateProcessor @@ -153,6 +155,7 @@ func buildNode( &simComet, &config, &storageBackend, + &blockchain, &chainSpec, &engineClient, &stateProcessor, @@ -172,6 +175,7 @@ func buildNode( return TestNode{ Node: beaconNode, StorageBackend: storageBackend, + Blockchain: blockchain, ChainSpec: chainSpec, APIBackend: apiServer.GetBeaconHandler(), SimComet: simComet,