Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions beacon/blockchain/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ type BlockchainI interface {
sdk.Context,
*ctypes.BeaconBlock,
) error
PruneOrphanedBlobs(lastBlockHeight int64) error
}

// BlobProcessor is the interface for the blobs processor.
Expand Down
32 changes: 32 additions & 0 deletions beacon/blockchain/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package blockchain

import (
"context"
"fmt"
"sync"
"sync/atomic"

Expand Down Expand Up @@ -127,3 +128,34 @@ func (s *Service) Stop() error {
func (s *Service) StorageBackend() StorageBackend {
return s.storageBackend
}

// PruneOrphanedBlobs removes any orphaned blob sidecars that may exist from incomplete block finalization.
func (s *Service) PruneOrphanedBlobs(lastBlockHeight int64) error {
orphanedSlot := math.Slot(lastBlockHeight + 1) // #nosec G115

// Check if any blob sidecars exist at the potentially orphaned slot
sidecars, err := s.storageBackend.AvailabilityStore().GetBlobSidecars(orphanedSlot)
if err != nil {
return fmt.Errorf("failed to read blob sidecars at slot %d: %w", orphanedSlot, err)
}

// If no sidecars exist at this slot, nothing to clean up
if len(sidecars) == 0 {
return nil
}

// Sidecars exist at this slot - they are orphaned, so delete them
s.logger.Warn("Found orphaned blob sidecars from incomplete block finalization, removing",
"slot", orphanedSlot.Base10(),
"num_sidecars", len(sidecars),
)

err = s.storageBackend.AvailabilityStore().DeleteBlobSidecars(orphanedSlot)
if err != nil {
return fmt.Errorf("failed to delete orphaned sidecars at slot %d: %w", orphanedSlot, err)
}

s.logger.Info("Successfully removed orphaned blob sidecars", "slot", orphanedSlot.Base10())

return nil
}
5 changes: 5 additions & 0 deletions consensus/cometbft/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,11 @@ func NewService(
}
}

// Clean up any orphaned blob sidecars from incomplete block finalization.
if err = s.Blockchain.PruneOrphanedBlobs(lastBlockHeight); err != nil {
panic(fmt.Errorf("failed pruning orphaned blobs: %w", err))
}

return s
}

Expand Down
3 changes: 3 additions & 0 deletions da/store/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,7 @@ type IndexDB interface {
// exist in the DB for any reason (pruned, invalid index), an empty list is
// returned with no error.
GetByIndex(index uint64) ([][]byte, error)

// DeleteByIndex removes all entries at the specified index
DeleteByIndex(index uint64) error
}
5 changes: 5 additions & 0 deletions da/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,8 @@ func (s *Store) Persist(sidecars types.BlobSidecars) error {
)
return nil
}

// DeleteBlobSidecars removes all blob sidecars for the specified slot.
func (s *Store) DeleteBlobSidecars(slot math.Slot) error {
return s.IndexDB.DeleteByIndex(slot.Unwrap())
}
14 changes: 14 additions & 0 deletions storage/filedb/range_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,20 @@ func (db *RangeDB) Prune(start, end uint64) error {
return err
}

// DeleteByIndex removes all entries at the specified index.
func (db *RangeDB) DeleteByIndex(index uint64) error {
db.rwMu.Lock()
defer db.rwMu.Unlock()

path := fmt.Sprintf(pathFormat, index)
if err := db.coreDB.fs.RemoveAll(path); err != nil && !os.IsNotExist(err) {
return fmt.Errorf("RangeDB DeleteByIndex failed to remove index %d: %w", index, err)
}

// Note: We intentionally do not update lowerBoundIndex here.
return nil
}

// GetByIndex takes the database index and returns all associated entries,
// expecting database keys to follow the prefix() format. If index does not
// exist in the DB for any reason (pruned, invalid index), an empty list is
Expand Down
34 changes: 34 additions & 0 deletions storage/filedb/range_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,40 @@ func TestRangeDB_Invariants(t *testing.T) {
}
}

// =========================== DELETE BY INDEX ================================

// TestRangeDB_DeleteByIndex_DoesNotAffectLowerBound verifies the critical
// invariant that DeleteByIndex does not modify lowerBoundIndex, unlike Prune.
func TestRangeDB_DeleteByIndex_DoesNotAffectLowerBound(t *testing.T) {
t.Parallel()
rdb := file.NewRangeDB(newTestFDB("/tmp/testdb-deletebyindex"))

// Populate indexes 1-10
require.NoError(t, populateTestDB(rdb, 1, 10))

// Prune indexes 1-5, which sets lowerBoundIndex to 5
require.NoError(t, rdb.Prune(1, 5))
lowerBoundBefore := getFirstNonNilIndex(rdb)
require.Equal(t, uint64(5), lowerBoundBefore, "lowerBoundIndex should be 5 after pruning")

// Delete index 7 using DeleteByIndex
require.NoError(t, rdb.DeleteByIndex(7))
exists, err := rdb.Has(7, []byte("key"))
require.NoError(t, err)
require.False(t, exists, "index 7 should be deleted")

// Verify lowerBoundIndex was NOT changed
lowerBoundAfter := getFirstNonNilIndex(rdb)
require.Equal(t, lowerBoundBefore, lowerBoundAfter, "DeleteByIndex must not modify lowerBoundIndex")

// Prune again to verify DeleteByIndex didn't break the pruning mechanism
require.NoError(t, rdb.Prune(5, 8))

// Verify the second prune worked correctly
lowerBoundAfter = getFirstNonNilIndex(rdb)
require.Equal(t, uint64(8), lowerBoundAfter, "second Prune should update lowerBoundIndex to 8")
}

// =============================== HELPERS ==================================

// newTestFDB returns a new file DB instance with an in-memory filesystem.
Expand Down
103 changes: 103 additions & 0 deletions testing/simulated/orphaned_blobs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
//go:build simulated

// SPDX-License-Identifier: BUSL-1.1
//
// Copyright (C) 2025, Berachain Foundation. All rights reserved.
// Use of this software is governed by the Business Source License included
// in the LICENSE file of this repository and at www.mariadb.com/bsl11.
//
// ANY USE OF THE LICENSED WORK IN VIOLATION OF THIS LICENSE WILL AUTOMATICALLY
// TERMINATE YOUR RIGHTS UNDER THIS LICENSE FOR THE CURRENT AND ALL OTHER
// VERSIONS OF THE LICENSED WORK.
//
// THIS LICENSE DOES NOT GRANT YOU ANY RIGHT IN ANY TRADEMARK OR LOGO OF
// LICENSOR OR ITS AFFILIATES (PROVIDED THAT YOU MAY USE A TRADEMARK OR LOGO OF
// LICENSOR AS EXPRESSLY REQUIRED BY THIS LICENSE).
//
// TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON
// AN "AS IS" BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS,
// EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND
// TITLE.

package simulated_test

import (
"time"

ctypes "github.com/berachain/beacon-kit/consensus-types/types"
"github.com/berachain/beacon-kit/da/kzg"
datypes "github.com/berachain/beacon-kit/da/types"
"github.com/berachain/beacon-kit/primitives/common"
"github.com/berachain/beacon-kit/primitives/crypto"
"github.com/berachain/beacon-kit/primitives/eip4844"
"github.com/berachain/beacon-kit/primitives/math"
"github.com/berachain/beacon-kit/testing/simulated"
"github.com/stretchr/testify/require"
)

// TestOrphanedBlobCleanup tests that orphaned blob sidecars are properly cleaned up on node restart.
// This simulates the scenario where sidecars are saved to disk but the block finalization fails.
func (s *SimulatedSuite) TestOrphanedBlobCleanup() {
// Initialize chain and move forward two blocks.
s.InitializeChain(s.T())
nodeAddress, err := s.SimComet.GetNodeAddress()
s.Require().NoError(err)
s.SimComet.Comet.SetNodeAddress(nodeAddress)

_, _, proposalTime := s.MoveChainToHeight(s.T(), 1, 2, nodeAddress, time.Now())

// Get the last committed block height.
lastBlockHeight := s.SimComet.Comet.CommitMultiStore().LastCommitID().Version
orphanedSlot := math.Slot(lastBlockHeight + 1)

// Create and persist orphaned blob sidecars.
// This simulates FinalizeSidecars succeeding but finalizeBeaconBlock failing.
orphanedSidecars := createOrphanedSidecars(s.T(), orphanedSlot, s.TestNode.KZGVerifier)
err = s.TestNode.StorageBackend.AvailabilityStore().Persist(orphanedSidecars)
s.Require().NoError(err)

// Verify orphaned blobs exist.
sidecars, err := s.TestNode.StorageBackend.AvailabilityStore().GetBlobSidecars(orphanedSlot)
s.Require().NoError(err)
s.Require().Len(sidecars, 1)

// Simulate node restart by calling PruneOrphanedBlobs.
err = s.TestNode.Blockchain.PruneOrphanedBlobs(lastBlockHeight)
s.Require().NoError(err)

// Verify orphaned blobs were cleaned up.
sidecars, err = s.TestNode.StorageBackend.AvailabilityStore().GetBlobSidecars(orphanedSlot)
s.Require().NoError(err)
s.Require().Empty(sidecars)

// Verify chain continues normally.
proposals, _, _ := s.MoveChainToHeight(s.T(), 3, 1, nodeAddress, proposalTime)
s.Require().Len(proposals, 1)
}

// createOrphanedSidecars creates fake blob sidecars for testing orphaned blob cleanup.
func createOrphanedSidecars(
t require.TestingT,
slot math.Slot,
verifier kzg.BlobProofVerifier,
) datypes.BlobSidecars {
blobs := []*eip4844.Blob{{1, 2, 3}}
proofs, commitments := simulated.GetProofAndCommitmentsForBlobs(require.New(t), blobs, verifier)

sidecars := make(datypes.BlobSidecars, len(blobs))
for i := range blobs {
sidecars[i] = datypes.BuildBlobSidecar(
math.U64(i),
&ctypes.SignedBeaconBlockHeader{
Header: &ctypes.BeaconBlockHeader{Slot: slot},
Signature: crypto.BLSSignature{},
},
blobs[i],
commitments[i],
proofs[i],
make([]common.Root, ctypes.KZGInclusionProofDepth),
)
}
return sidecars
}
4 changes: 4 additions & 0 deletions testing/simulated/testnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type ValidatorAPI interface {
type TestNode struct {
nodetypes.Node
StorageBackend blockchain.StorageBackend
Blockchain *blockchain.Service
ChainSpec chain.Spec
APIBackend ValidatorAPI
SimComet *SimComet
Expand Down Expand Up @@ -128,6 +129,7 @@ func buildNode(
simComet *SimComet
config *config.Config
storageBackend blockchain.StorageBackend
blockchain *blockchain.Service
chainSpec chain.Spec
engineClient *client.EngineClient
stateProcessor *core.StateProcessor
Expand All @@ -153,6 +155,7 @@ func buildNode(
&simComet,
&config,
&storageBackend,
&blockchain,
&chainSpec,
&engineClient,
&stateProcessor,
Expand All @@ -172,6 +175,7 @@ func buildNode(
return TestNode{
Node: beaconNode,
StorageBackend: storageBackend,
Blockchain: blockchain,
ChainSpec: chainSpec,
APIBackend: apiServer.GetBeaconHandler(),
SimComet: simComet,
Expand Down
Loading