Skip to content

Commit 1764c72

Browse files
authored
Comm Component for Simplex (#3998)
Signed-off-by: Sam Liokumovich <[email protected]>
1 parent f7941d3 commit 1764c72

File tree

12 files changed

+600
-33
lines changed

12 files changed

+600
-33
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ require (
8787
github.com/Microsoft/go-winio v0.6.1 // indirect
8888
github.com/VictoriaMetrics/fastcache v1.12.1 // indirect
8989
github.com/ava-labs/firewood-go-ethhash/ffi v0.0.8 // indirect
90-
github.com/ava-labs/simplex v0.0.0-20250626192006-220e6aeacdc1
90+
github.com/ava-labs/simplex v0.0.0-20250715173145-e4fe035cb9b2
9191
github.com/beorn7/perks v1.0.1 // indirect
9292
github.com/bits-and-blooms/bitset v1.10.0 // indirect
9393
github.com/btcsuite/btcd/btcec/v2 v2.3.2 // indirect

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,8 @@ github.com/ava-labs/ledger-avalanche/go v0.0.0-20241009183145-e6f90a8a1a60 h1:EL
7878
github.com/ava-labs/ledger-avalanche/go v0.0.0-20241009183145-e6f90a8a1a60/go.mod h1:/7qKobTfbzBu7eSTVaXMTr56yTYk4j2Px6/8G+idxHo=
7979
github.com/ava-labs/libevm v1.13.14-0.3.0.rc.1 h1:vBMYo+Iazw0rGTr+cwjkBdh5eadLPlv4ywI4lKye3CA=
8080
github.com/ava-labs/libevm v1.13.14-0.3.0.rc.1/go.mod h1:+Iol+sVQ1KyoBsHf3veyrBmHCXr3xXRWq6ZXkgVfNLU=
81-
github.com/ava-labs/simplex v0.0.0-20250626192006-220e6aeacdc1 h1:ipeWExRrhYF7DZ/bcigoQrzo3vZWNZrFx8W+Yg2sJ2Q=
82-
github.com/ava-labs/simplex v0.0.0-20250626192006-220e6aeacdc1/go.mod h1:GVzumIo3zR23/qGRN2AdnVkIPHcKMq/D89EGWZfMGQ0=
81+
github.com/ava-labs/simplex v0.0.0-20250715173145-e4fe035cb9b2 h1:PZ5PMEDkTbd6NLNiwKWV8nz7QvAM+QC9Rj3/NrL9ICA=
82+
github.com/ava-labs/simplex v0.0.0-20250715173145-e4fe035cb9b2/go.mod h1:GVzumIo3zR23/qGRN2AdnVkIPHcKMq/D89EGWZfMGQ0=
8383
github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g=
8484
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
8585
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=

message/messagemock/outbound_message_builder.go

Lines changed: 15 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

message/outbound_msg_builder.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,10 @@ type OutboundMsgBuilder interface {
181181
chainID ids.ID,
182182
msg []byte,
183183
) (OutboundMessage, error)
184+
185+
SimplexMessage(
186+
msg *p2p.Simplex,
187+
) (OutboundMessage, error)
184188
}
185189

186190
type outMsgBuilder struct {
@@ -725,3 +729,15 @@ func (b *outMsgBuilder) AppGossip(chainID ids.ID, msg []byte) (OutboundMessage,
725729
false,
726730
)
727731
}
732+
733+
func (b *outMsgBuilder) SimplexMessage(msg *p2p.Simplex) (OutboundMessage, error) {
734+
return b.builder.createOutbound(
735+
&p2p.Message{
736+
Message: &p2p.Message_Simplex{
737+
Simplex: msg,
738+
},
739+
},
740+
b.compressionType,
741+
false,
742+
)
743+
}

simplex/block.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ type blockDeserializer struct {
6767
parser block.Parser
6868
}
6969

70-
func (d *blockDeserializer) DeserializeBlock(bytes []byte) (simplex.Block, error) {
70+
func (d *blockDeserializer) DeserializeBlock(ctx context.Context, bytes []byte) (simplex.Block, error) {
7171
var canotoBlock canotoSimplexBlock
7272

7373
if err := canotoBlock.UnmarshalCanoto(bytes); err != nil {
@@ -79,7 +79,7 @@ func (d *blockDeserializer) DeserializeBlock(bytes []byte) (simplex.Block, error
7979
return nil, fmt.Errorf("failed to parse protocol metadata: %w", err)
8080
}
8181

82-
vmblock, err := d.parser.ParseBlock(context.TODO(), canotoBlock.InnerBlock)
82+
vmblock, err := d.parser.ParseBlock(ctx, canotoBlock.InnerBlock)
8383
if err != nil {
8484
return nil, err
8585
}

simplex/block_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import (
2121

2222
func TestBlockSerialization(t *testing.T) {
2323
unexpectedBlockBytes := errors.New("unexpected block bytes")
24-
24+
ctx := context.Background()
2525
testBlock := snowmantest.BuildChild(snowmantest.Genesis)
2626

2727
b := &Block{
@@ -88,7 +88,7 @@ func TestBlockSerialization(t *testing.T) {
8888
}
8989

9090
// Deserialize the block
91-
deserializedBlock, err := deserializer.DeserializeBlock(tt.blockBytes)
91+
deserializedBlock, err := deserializer.DeserializeBlock(ctx, tt.blockBytes)
9292
require.ErrorIs(t, err, tt.expectedError)
9393

9494
if tt.expectedError == nil {

simplex/bls_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,7 @@ import (
1313
)
1414

1515
func TestBLSVerifier(t *testing.T) {
16-
config, err := newEngineConfig()
17-
require.NoError(t, err)
16+
config := newEngineConfig(t, 1)
1817
signer, verifier := NewBLSAuth(config)
1918
otherNodeID := ids.GenerateTestNodeID()
2019

@@ -81,7 +80,7 @@ func TestBLSVerifier(t *testing.T) {
8180

8281
for _, tt := range tests {
8382
t.Run(tt.name, func(t *testing.T) {
84-
err = verifier.Verify(msg, tt.sig, tt.nodeID)
83+
err := verifier.Verify(msg, tt.sig, tt.nodeID)
8584
require.ErrorIs(t, err, tt.expectErr)
8685
})
8786
}

simplex/comm.go

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved.
2+
// See the file LICENSE for licensing terms.
3+
4+
package simplex
5+
6+
import (
7+
"errors"
8+
"fmt"
9+
10+
"github.com/ava-labs/simplex"
11+
"go.uber.org/zap"
12+
13+
"github.com/ava-labs/avalanchego/ids"
14+
"github.com/ava-labs/avalanchego/message"
15+
"github.com/ava-labs/avalanchego/proto/pb/p2p"
16+
"github.com/ava-labs/avalanchego/snow/engine/common"
17+
"github.com/ava-labs/avalanchego/snow/networking/sender"
18+
"github.com/ava-labs/avalanchego/subnets"
19+
"github.com/ava-labs/avalanchego/utils/set"
20+
)
21+
22+
var (
23+
_ simplex.Communication = (*Comm)(nil)
24+
errNodeNotFound = errors.New("node not found in the validator list")
25+
)
26+
27+
type Comm struct {
28+
logger simplex.Logger
29+
subnetID ids.ID
30+
chainID ids.ID
31+
// broadcastNodes are the nodes that should receive broadcast messages
32+
broadcastNodes set.Set[ids.NodeID]
33+
// allNodes are the IDs of all the nodes in the subnet
34+
allNodes []simplex.NodeID
35+
36+
// sender is used to send messages to other nodes
37+
sender sender.ExternalSender
38+
msgBuilder message.OutboundMsgBuilder
39+
}
40+
41+
func NewComm(config *Config) (*Comm, error) {
42+
if _, ok := config.Validators[config.Ctx.NodeID]; !ok {
43+
config.Log.Warn("Node is not a validator for the subnet",
44+
zap.Stringer("nodeID", config.Ctx.NodeID),
45+
zap.Stringer("chainID", config.Ctx.ChainID),
46+
zap.Stringer("subnetID", config.Ctx.SubnetID),
47+
)
48+
return nil, fmt.Errorf("our %w: %s", errNodeNotFound, config.Ctx.NodeID)
49+
}
50+
51+
broadcastNodes := set.NewSet[ids.NodeID](len(config.Validators) - 1)
52+
allNodes := make([]simplex.NodeID, 0, len(config.Validators))
53+
// grab all the nodes that are validators for the subnet
54+
for _, vd := range config.Validators {
55+
allNodes = append(allNodes, vd.NodeID[:])
56+
if vd.NodeID == config.Ctx.NodeID {
57+
continue // skip our own node ID
58+
}
59+
60+
broadcastNodes.Add(vd.NodeID)
61+
}
62+
63+
return &Comm{
64+
subnetID: config.Ctx.SubnetID,
65+
broadcastNodes: broadcastNodes,
66+
allNodes: allNodes,
67+
logger: config.Log,
68+
sender: config.Sender,
69+
msgBuilder: config.OutboundMsgBuilder,
70+
chainID: config.Ctx.ChainID,
71+
}, nil
72+
}
73+
74+
func (c *Comm) Nodes() []simplex.NodeID {
75+
return c.allNodes
76+
}
77+
78+
func (c *Comm) Send(msg *simplex.Message, destination simplex.NodeID) {
79+
outboundMsg, err := c.simplexMessageToOutboundMessage(msg)
80+
if err != nil {
81+
c.logger.Error("Failed creating message", zap.Error(err))
82+
return
83+
}
84+
85+
dest, err := ids.ToNodeID(destination)
86+
if err != nil {
87+
c.logger.Error("Failed to convert destination NodeID", zap.Error(err))
88+
return
89+
}
90+
91+
c.sender.Send(outboundMsg, common.SendConfig{NodeIDs: set.Of(dest)}, c.subnetID, subnets.NoOpAllower)
92+
}
93+
94+
func (c *Comm) Broadcast(msg *simplex.Message) {
95+
outboundMsg, err := c.simplexMessageToOutboundMessage(msg)
96+
if err != nil {
97+
c.logger.Error("Failed creating message", zap.Error(err))
98+
return
99+
}
100+
101+
c.sender.Send(outboundMsg, common.SendConfig{NodeIDs: c.broadcastNodes}, c.subnetID, subnets.NoOpAllower)
102+
}
103+
104+
func (c *Comm) simplexMessageToOutboundMessage(msg *simplex.Message) (message.OutboundMessage, error) {
105+
var simplexMsg *p2p.Simplex
106+
switch {
107+
case msg.VerifiedBlockMessage != nil:
108+
bytes, err := msg.VerifiedBlockMessage.VerifiedBlock.Bytes()
109+
if err != nil {
110+
return nil, fmt.Errorf("failed to serialize block: %w", err)
111+
}
112+
simplexMsg = newBlockProposal(c.chainID, bytes, msg.VerifiedBlockMessage.Vote)
113+
case msg.VoteMessage != nil:
114+
simplexMsg = newVote(c.chainID, msg.VoteMessage)
115+
case msg.EmptyVoteMessage != nil:
116+
simplexMsg = newEmptyVote(c.chainID, msg.EmptyVoteMessage)
117+
case msg.FinalizeVote != nil:
118+
simplexMsg = newFinalizeVote(c.chainID, msg.FinalizeVote)
119+
case msg.Notarization != nil:
120+
simplexMsg = newNotarization(c.chainID, msg.Notarization)
121+
case msg.EmptyNotarization != nil:
122+
simplexMsg = newEmptyNotarization(c.chainID, msg.EmptyNotarization)
123+
case msg.Finalization != nil:
124+
simplexMsg = newFinalization(c.chainID, msg.Finalization)
125+
case msg.ReplicationRequest != nil:
126+
simplexMsg = newReplicationRequest(c.chainID, msg.ReplicationRequest)
127+
case msg.VerifiedReplicationResponse != nil:
128+
msg, err := newReplicationResponse(c.chainID, msg.VerifiedReplicationResponse)
129+
if err != nil {
130+
return nil, fmt.Errorf("failed to create replication response: %w", err)
131+
}
132+
simplexMsg = msg
133+
default:
134+
return nil, fmt.Errorf("unknown message type: %+v", msg)
135+
}
136+
137+
return c.msgBuilder.SimplexMessage(simplexMsg)
138+
}

simplex/comm_test.go

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved.
2+
// See the file LICENSE for licensing terms.
3+
4+
package simplex
5+
6+
import (
7+
"testing"
8+
"time"
9+
10+
"github.com/ava-labs/simplex"
11+
"github.com/prometheus/client_golang/prometheus"
12+
"github.com/stretchr/testify/require"
13+
"go.uber.org/mock/gomock"
14+
15+
"github.com/ava-labs/avalanchego/ids"
16+
"github.com/ava-labs/avalanchego/message"
17+
"github.com/ava-labs/avalanchego/snow/engine/common"
18+
"github.com/ava-labs/avalanchego/snow/networking/sender/sendermock"
19+
"github.com/ava-labs/avalanchego/utils/constants"
20+
"github.com/ava-labs/avalanchego/utils/set"
21+
)
22+
23+
var testSimplexMessage = simplex.Message{
24+
VoteMessage: &simplex.Vote{
25+
Vote: simplex.ToBeSignedVote{
26+
BlockHeader: simplex.BlockHeader{
27+
ProtocolMetadata: simplex.ProtocolMetadata{
28+
Version: 1,
29+
Epoch: 1,
30+
Round: 1,
31+
Seq: 1,
32+
},
33+
},
34+
},
35+
Signature: simplex.Signature{
36+
Signer: []byte("dummy_node_id"),
37+
Value: []byte("dummy_signature"),
38+
},
39+
},
40+
}
41+
42+
func TestCommSendMessage(t *testing.T) {
43+
config := newEngineConfig(t, 1)
44+
45+
destinationNodeID := ids.GenerateTestNodeID()
46+
ctrl := gomock.NewController(t)
47+
sender := sendermock.NewExternalSender(ctrl)
48+
mc, err := message.NewCreator(
49+
prometheus.NewRegistry(),
50+
constants.DefaultNetworkCompressionType,
51+
10*time.Second,
52+
)
53+
require.NoError(t, err)
54+
55+
config.OutboundMsgBuilder = mc
56+
config.Sender = sender
57+
58+
comm, err := NewComm(config)
59+
require.NoError(t, err)
60+
61+
outboundMsg, err := mc.SimplexMessage(newVote(config.Ctx.ChainID, testSimplexMessage.VoteMessage))
62+
require.NoError(t, err)
63+
expectedSendConfig := common.SendConfig{
64+
NodeIDs: set.Of(destinationNodeID),
65+
}
66+
sender.EXPECT().Send(outboundMsg, expectedSendConfig, comm.subnetID, gomock.Any())
67+
68+
comm.Send(&testSimplexMessage, destinationNodeID[:])
69+
}
70+
71+
// TestCommBroadcast tests the Broadcast method sends to all nodes in the subnet
72+
// not including the sending node.
73+
func TestCommBroadcast(t *testing.T) {
74+
config := newEngineConfig(t, 3)
75+
76+
ctrl := gomock.NewController(t)
77+
sender := sendermock.NewExternalSender(ctrl)
78+
mc, err := message.NewCreator(
79+
prometheus.NewRegistry(),
80+
constants.DefaultNetworkCompressionType,
81+
10*time.Second,
82+
)
83+
require.NoError(t, err)
84+
85+
config.OutboundMsgBuilder = mc
86+
config.Sender = sender
87+
88+
comm, err := NewComm(config)
89+
require.NoError(t, err)
90+
outboundMsg, err := mc.SimplexMessage(newVote(config.Ctx.ChainID, testSimplexMessage.VoteMessage))
91+
require.NoError(t, err)
92+
nodes := make([]ids.NodeID, 0, len(comm.Nodes()))
93+
for _, node := range comm.Nodes() {
94+
if node.Equals(config.Ctx.NodeID[:]) {
95+
continue // skip the sending node
96+
}
97+
nodes = append(nodes, ids.NodeID(node))
98+
}
99+
100+
expectedSendConfig := common.SendConfig{
101+
NodeIDs: set.Of(nodes...),
102+
}
103+
104+
sender.EXPECT().Send(outboundMsg, expectedSendConfig, comm.subnetID, gomock.Any())
105+
106+
comm.Broadcast(&testSimplexMessage)
107+
}
108+
109+
func TestCommFailsWithoutCurrentNode(t *testing.T) {
110+
config := newEngineConfig(t, 3)
111+
112+
ctrl := gomock.NewController(t)
113+
mc, err := message.NewCreator(
114+
prometheus.NewRegistry(),
115+
constants.DefaultNetworkCompressionType,
116+
10*time.Second,
117+
)
118+
require.NoError(t, err)
119+
sender := sendermock.NewExternalSender(ctrl)
120+
121+
config.OutboundMsgBuilder = mc
122+
config.Sender = sender
123+
124+
// set the curNode to a different nodeID than the one in the config
125+
vdrs := generateTestNodes(t, 3)
126+
config.Validators = newTestValidatorInfo(vdrs)
127+
128+
_, err = NewComm(config)
129+
require.ErrorIs(t, err, errNodeNotFound)
130+
}

0 commit comments

Comments
 (0)