Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions evmd/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ type EVMD struct {
appCodec codec.Codec
interfaceRegistry types.InterfaceRegistry
txConfig client.TxConfig
clientCtx client.Context

pendingTxListeners []evmante.PendingTxListener

Expand Down Expand Up @@ -405,7 +404,7 @@ func NewExampleApp(

app.GovKeeper = *govKeeper.SetHooks(
govtypes.NewMultiGovHooks(
// register the governance hooks
// register the governance hooks
),
)

Expand Down Expand Up @@ -1073,10 +1072,6 @@ func (app *EVMD) GetTxConfig() client.TxConfig {
return app.txConfig
}

func (app *EVMD) SetClientCtx(clientCtx client.Context) { // TODO:VLAD - Remove this if possible
app.clientCtx = clientCtx
}

// Close unsubscribes from the CometBFT event bus (if set) and closes the mempool and underlying BaseApp.
func (app *EVMD) Close() error {
var err error
Expand Down
1 change: 0 additions & 1 deletion evmd/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ func (app *EVMD) configureEVMMempool(appOpts servertypes.AppOptions, logger log.
app.EVMKeeper,
app.FeeMarketKeeper,
app.txConfig,
app.clientCtx,
mempoolConfig,
cosmosPoolMaxTx,
)
Expand Down
7 changes: 7 additions & 0 deletions mempool/check_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/cosmos/cosmos-sdk/types"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
"github.com/cosmos/evm/mempool/txpool"
)

// NewCheckTxHandler creates a CheckTx handler that integrates with the EVM mempool for transaction validation.
Expand All @@ -28,6 +29,12 @@ func NewCheckTxHandler(mempool *ExperimentalEVMMempool) types.CheckTxHandler {
// anything else, return regular error
return sdkerrors.ResponseCheckTxWithEvents(err, gInfo.GasWanted, gInfo.GasUsed, anteEvents, false), nil
}
// If its already known, this can mean the the tx was promoted from nonce gap to valid
// and by allowing ErrAlreadyKnown to be silent, we allow re-gossiping of such txs
// this also covers the case of re-submission of the same tx enforcing overpricing for replacement
if errors.Is(err, txpool.ErrAlreadyKnown) {
return sdkerrors.ResponseCheckTxWithEvents(nil, gInfo.GasWanted, gInfo.GasUsed, anteEvents, false), nil
}

return &abci.ResponseCheckTx{
GasWanted: int64(gInfo.GasWanted), // #nosec G115 -- this is copied from the Cosmos SDK
Expand Down
76 changes: 54 additions & 22 deletions mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"sync"

"github.com/ethereum/go-ethereum/common"
ethtypes "github.com/ethereum/go-ethereum/core/types"
"github.com/holiman/uint256"

Expand All @@ -28,6 +29,11 @@ import (

var _ sdkmempool.ExtMempool = &ExperimentalEVMMempool{}

// AllowUnsafeSyncInsert indicates whether to perform synchronous inserts into the mempool
// for testing purposes. When true, Insert will block until the transaction is fully processed.
// This should be used only in tests to ensure deterministic behavior
var AllowUnsafeSyncInsert = false

const (
// SubscriberName is the name of the event bus subscriber for the EVM mempool
SubscriberName = "evm"
Expand All @@ -53,6 +59,7 @@ type (
logger log.Logger
txConfig client.TxConfig
blockchain *Blockchain
clientCtx client.Context
blockGasLimit uint64 // Block gas limit from consensus parameters
minTip *uint256.Int

Expand Down Expand Up @@ -88,7 +95,6 @@ func NewExperimentalEVMMempool(
vmKeeper VMKeeperI,
feeMarketKeeper FeeMarketKeeperI,
txConfig client.TxConfig,
clientCtx client.Context,
config *EVMMempoolConfig,
cosmosPoolMaxTx int,
) *ExperimentalEVMMempool {
Expand Down Expand Up @@ -121,19 +127,6 @@ func NewExperimentalEVMMempool(

legacyPool := legacypool.New(legacyConfig, blockchain)

// Set up broadcast function using clientCtx
if config.BroadCastTxFn != nil {
legacyPool.BroadcastTxFn = config.BroadCastTxFn
} else {
// Create default broadcast function using clientCtx.
// The EVM mempool will broadcast transactions when it promotes them
// from queued into pending, noting their readiness to be executed.
legacyPool.BroadcastTxFn = func(txs []*ethtypes.Transaction) error {
logger.Debug("broadcasting EVM transactions", "tx_count", len(txs))
return broadcastEVMTransactions(clientCtx, txConfig, txs)
}
}

txPool, err := txpool.New(uint64(0), blockchain, []txpool.SubPool{legacyPool})
if err != nil {
panic(err)
Expand Down Expand Up @@ -179,6 +172,7 @@ func NewExperimentalEVMMempool(
cosmosPoolConfig.MaxTx = cosmosPoolMaxTx
cosmosPool = sdkmempool.NewPriorityMempool(*cosmosPoolConfig)

// Create the evmMempool
evmMempool := &ExperimentalEVMMempool{
vmKeeper: vmKeeper,
txPool: txPool,
Expand All @@ -192,6 +186,16 @@ func NewExperimentalEVMMempool(
anteHandler: config.AnteHandler,
}

// Set up broadcast function
if config.BroadCastTxFn != nil {
legacyPool.BroadcastTxFn = config.BroadCastTxFn
} else {
// Create default broadcast function using clientCtx.
// The EVM mempool will broadcast transactions when it promotes them
// from queued into pending, noting their readiness to be executed.
legacyPool.BroadcastTxFn = evmMempool.defaultBroadcastTxFn
}

vmKeeper.SetEvmMempool(evmMempool)

return evmMempool
Expand All @@ -209,6 +213,11 @@ func (m *ExperimentalEVMMempool) GetTxPool() *txpool.TxPool {
return m.txPool
}

// SetClientCtx sets the client context provider for broadcasting transactions
func (m *ExperimentalEVMMempool) SetClientCtx(clientCtx client.Context) {
m.clientCtx = clientCtx
}

// Insert adds a transaction to the appropriate mempool (EVM or Cosmos).
// EVM transactions are routed to the EVM transaction pool, while all other
// transactions are inserted into the Cosmos sdkmempool. The method assumes
Expand All @@ -227,7 +236,7 @@ func (m *ExperimentalEVMMempool) Insert(goCtx context.Context, tx sdk.Tx) error
hash := ethMsg.Hash()
m.logger.Debug("inserting EVM transaction", "tx_hash", hash)
ethTxs := []*ethtypes.Transaction{ethMsg.AsTransaction()}
errs := m.txPool.Add(ethTxs, true)
errs := m.txPool.Add(ethTxs, AllowUnsafeSyncInsert)
if len(errs) > 0 && errs[0] != nil {
m.logger.Error("failed to insert EVM transaction", "error", errs[0], "tx_hash", hash)
return errs[0]
Expand Down Expand Up @@ -416,6 +425,16 @@ func (m *ExperimentalEVMMempool) HasEventBus() bool {
return m.eventBus != nil
}

// Has returns true if the transaction with the given hash is already in the mempool.
// This checks tx pool for EVM transactions, which iterates through all pools (currently only legacypool)
func (m *ExperimentalEVMMempool) Has(hash common.Hash) bool {
m.mtx.Lock()
defer m.mtx.Unlock()

// Check the tx pool
return m.txPool.Has(hash)
}

// Close unsubscribes from the CometBFT event bus and shuts down the mempool.
func (m *ExperimentalEVMMempool) Close() error {
var errs []error
Expand Down Expand Up @@ -477,20 +496,33 @@ func (m *ExperimentalEVMMempool) getIterators(goCtx context.Context, i [][]byte)
return orderedEVMPendingTxes, cosmosPendingTxes
}

// defaultBroadcastTxFn is the default function for broadcasting EVM transactions
// using the configured client context
func (m *ExperimentalEVMMempool) defaultBroadcastTxFn(txs []*ethtypes.Transaction) error {
m.logger.Debug("broadcasting EVM transactions", "tx_count", len(txs))

// Apply the broadcast EVM transactions using the client context
return broadcastEVMTransactions(m.clientCtx, txs)
}

// broadcastEVMTransactions converts Ethereum transactions to Cosmos SDK format and broadcasts them.
// This function wraps EVM transactions in MsgEthereumTx messages and submits them to the network
// using the provided client context. It handles encoding and error reporting for each transaction.
func broadcastEVMTransactions(clientCtx client.Context, txConfig client.TxConfig, ethTxs []*ethtypes.Transaction) error {
func broadcastEVMTransactions(clientCtx client.Context, ethTxs []*ethtypes.Transaction) error {
for _, ethTx := range ethTxs {
msg := &evmtypes.MsgEthereumTx{}
msg.FromEthereumTx(ethTx)

txBuilder := txConfig.NewTxBuilder()
if err := txBuilder.SetMsgs(msg); err != nil {
return fmt.Errorf("failed to set msg in tx builder: %w", err)
ethSigner := ethtypes.LatestSigner(evmtypes.GetEthChainConfig())
if err := msg.FromSignedEthereumTx(ethTx, ethSigner); err != nil {
return fmt.Errorf("failed to convert ethereum transaction: %w", err)
}

cosmosTx, err := msg.BuildTx(clientCtx.TxConfig.NewTxBuilder(), evmtypes.GetEVMCoinDenom())
if err != nil {
return fmt.Errorf("failed to build cosmos tx: %w", err)
}

txBytes, err := txConfig.TxEncoder()(txBuilder.GetTx())
txBytes, err := clientCtx.TxConfig.TxEncoder()(cosmosTx)
if err != nil {
return fmt.Errorf("failed to encode transaction: %w", err)
}
Expand All @@ -499,7 +531,7 @@ func broadcastEVMTransactions(clientCtx client.Context, txConfig client.TxConfig
if err != nil {
return fmt.Errorf("failed to broadcast transaction %s: %w", ethTx.Hash().Hex(), err)
}
if res.Code != 0 {
if res.Code != 0 && res.Code != 19 && res.RawLog != "already known" {
return fmt.Errorf("transaction %s rejected by mempool: code=%d, log=%s", ethTx.Hash().Hex(), res.Code, res.RawLog)
}
}
Expand Down
7 changes: 7 additions & 0 deletions rpc/backend/call_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"google.golang.org/grpc/status"

"github.com/cosmos/evm/mempool"
"github.com/cosmos/evm/mempool/txpool"
rpctypes "github.com/cosmos/evm/rpc/types"
evmtypes "github.com/cosmos/evm/x/vm/types"

Expand Down Expand Up @@ -148,6 +149,12 @@ func (b *Backend) SendRawTransaction(data hexutil.Bytes) (common.Hash, error) {

txHash := ethereumTx.AsTransaction().Hash()

// Check if transaction is already in the mempool before broadcasting
// This is important for user-submitted transactions via JSON-RPC to provide proper error feedback
if b.Mempool != nil && b.Mempool.Has(txHash) {
return txHash, txpool.ErrAlreadyKnown
}

syncCtx := b.ClientCtx.WithBroadcastMode(flags.BroadcastSync)
rsp, err := syncCtx.BroadcastTx(txBytes)
if rsp != nil && rsp.Code != 0 {
Expand Down
16 changes: 7 additions & 9 deletions server/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ type Application interface {
types.Application
AppWithPendingTxStream
GetMempool() sdkmempool.ExtMempool
SetClientCtx(clientCtx client.Context)
}

// AppCreator is a function that allows us to lazily initialize an application implementing with AppWithPendingTxStream.
Expand Down Expand Up @@ -137,7 +136,7 @@ which accepts a path for the resulting pprof file.
if !withbft {
serverCtx.Logger.Info("starting ABCI without CometBFT")
return wrapCPUProfile(serverCtx, func() error {
return startStandAlone(serverCtx, clientCtx, opts)
return startStandAlone(serverCtx, opts)
})
}

Expand Down Expand Up @@ -247,7 +246,7 @@ which accepts a path for the resulting pprof file.
// Parameters:
// - svrCtx: The context object that holds server configurations, logger, and other stateful information.
// - opts: Options for starting the server, including functions for creating the application and opening the database.
func startStandAlone(svrCtx *server.Context, clientCtx client.Context, opts StartOptions) error {
func startStandAlone(svrCtx *server.Context, opts StartOptions) error {
addr := svrCtx.Viper.GetString(srvflags.Address)
transport := svrCtx.Viper.GetString(srvflags.Transport)
home := svrCtx.Viper.GetString(flags.FlagHome)
Expand Down Expand Up @@ -278,11 +277,6 @@ func startStandAlone(svrCtx *server.Context, clientCtx client.Context, opts Star
svrCtx.Logger.Error("close application failed", "error", err.Error())
}
}()
evmApp, ok := app.(Application)
if !ok {
svrCtx.Logger.Error("failed to get server config", "error", err.Error())
}
evmApp.SetClientCtx(clientCtx)

config, err := cosmosevmserverconfig.GetConfig(svrCtx.Viper)
if err != nil {
Expand Down Expand Up @@ -401,7 +395,6 @@ func startInProcess(svrCtx *server.Context, clientCtx client.Context, opts Start
if !ok {
svrCtx.Logger.Error("failed to get server config", "error", err.Error())
}
evmApp.SetClientCtx(clientCtx)

nodeKey, err := p2p.LoadOrGenNodeKey(cfg.NodeKeyFile())
if err != nil {
Expand Down Expand Up @@ -463,6 +456,11 @@ func startInProcess(svrCtx *server.Context, clientCtx client.Context, opts Start
app.RegisterTxService(clientCtx)
app.RegisterTendermintService(clientCtx)
app.RegisterNodeService(clientCtx, config.Config)

// Set the clientCtx into the mempool
if m, ok := evmApp.GetMempool().(*evmmempool.ExperimentalEVMMempool); ok && m != nil {
m.SetClientCtx(clientCtx)
}
}

metrics, err := startTelemetry(config)
Expand Down
4 changes: 4 additions & 0 deletions tests/integration/mempool/test_setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/stretchr/testify/suite"

evmmempool "github.com/cosmos/evm/mempool"
testconstants "github.com/cosmos/evm/testutil/constants"
"github.com/cosmos/evm/testutil/integration/evm/factory"
"github.com/cosmos/evm/testutil/integration/evm/grpc"
Expand Down Expand Up @@ -80,6 +81,9 @@ func (s *IntegrationTestSuite) SetupTestWithChainID(chainID testconstants.ChainI
initialCount := mempool.CountTx()
s.Require().Equal(0, initialCount, "mempool should be empty initially")

// Enforces deterministic mempool state for tests
evmmempool.AllowUnsafeSyncInsert = true

s.network = nw
s.factory = tf
}
Expand Down
2 changes: 1 addition & 1 deletion tests/systemtests/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestTxsReplacement(t *testing.T) {
}

func TestExceptions(t *testing.T) {
mempool.TestTxRebroadcasting(t)
mempool.TestRunTxBroadcasting(t)
mempool.TestMinimumGasPricesZero(t)
}

Expand Down
Loading
Loading