Skip to content

Commit 1cb7eb1

Browse files
committed
update gossip mesh
1 parent f9ddbf4 commit 1cb7eb1

File tree

6 files changed

+92
-19
lines changed

6 files changed

+92
-19
lines changed

src/internal/common/utils.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package common
2+
3+
func DeduplicateStrings(input []string) []string {
4+
output := []string{}
5+
seen := make(map[string]struct{})
6+
for _, s := range input {
7+
if _, ok := seen[s]; !ok {
8+
seen[s] = struct{}{}
9+
output = append(output, s)
10+
}
11+
}
12+
return output
13+
}

src/internal/protocol/clock.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,11 @@ func StartTicker() {
3737
// check if peer is still connected
3838
p, error := GetPeerFromTable(peer_id.String())
3939
if error == nil {
40-
p.Connected = true
41-
if peer_id != host.ID() && host.Network().Connectedness(peer_id) != network.Connected {
40+
if host.Network().Connectedness(peer_id) == network.Connected {
41+
p.Connected = true
42+
} else if peer_id != host.ID() && host.Network().Connectedness(peer_id) != network.Connected {
4243
// try to dial the peer, if cannot dial, then mark it as disconnected
44+
common.Logger.Info("Dialing ", peer_id.String(), "...")
4345
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
4446
defer cancel()
4547
addrInfo := libpeer.AddrInfo{ID: peer_id, Addrs: host.Peerstore().Addrs(peer_id)}
@@ -60,6 +62,8 @@ func StartTicker() {
6062
value, err := json.Marshal(p)
6163
if err == nil {
6264
UpdateNodeTableHook(ds.NewKey(peer_id.String()), value)
65+
} else {
66+
common.Logger.Error("Error while marshalling peer: ", peer_id.String(), err)
6367
}
6468
}
6569
}

src/internal/protocol/crdt.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,11 @@ func GetCRDTStore() (*crdt.Datastore, context.CancelFunc) {
3939

4040
ipfs, err = ipfslite.New(ctx, store, nil, host, &dht, nil)
4141
common.ReportError(err, "Error while creating ipfs lite node")
42-
43-
psub, err := pubsub.NewGossipSub(ctx, host)
42+
pubsubParams := pubsub.DefaultGossipSubParams()
43+
pubsubParams.D = 128
44+
pubsubParams.Dlo = 16
45+
pubsubParams.Dhi = 256
46+
psub, err := pubsub.NewGossipSub(ctx, host, pubsub.WithGossipSubParams(pubsubParams))
4447
common.ReportError(err, "Error while creating pubsub")
4548

4649
topic, err := psub.Join(pubsubNet)
@@ -57,6 +60,15 @@ func GetCRDTStore() (*crdt.Datastore, context.CancelFunc) {
5760
break
5861
}
5962
host.ConnManager().TagPeer(msg.ReceivedFrom, "keep", 100)
63+
// Update LastSeen when we receive a message from a peer
64+
p, gerr := GetPeerFromTable(msg.ReceivedFrom.String())
65+
if gerr != nil {
66+
p = Peer{ID: msg.ReceivedFrom.String()}
67+
}
68+
p.LastSeen = time.Now().Unix()
69+
if b, merr := json.Marshal(p); merr == nil {
70+
UpdateNodeTableHook(ds.NewKey(msg.ReceivedFrom.String()), b)
71+
}
6072
}
6173
}()
6274

src/internal/protocol/host.go

Lines changed: 57 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package protocol
33
import (
44
"context"
55
"crypto/rand"
6+
"encoding/json"
67
mrand "math/rand"
78
"ocf/internal/common"
89
"strconv"
@@ -21,6 +22,7 @@ import (
2122
"github.com/libp2p/go-libp2p/core/peer"
2223
"github.com/libp2p/go-libp2p/core/routing"
2324
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
25+
connmgr "github.com/libp2p/go-libp2p/p2p/net/connmgr"
2426
"github.com/libp2p/go-libp2p/p2p/security/noise"
2527
libp2ptls "github.com/libp2p/go-libp2p/p2p/security/tls"
2628
"github.com/spf13/viper"
@@ -52,15 +54,17 @@ func GetP2PNode(ds datastore.Batching) (host.Host, dualdht.DHT) {
5254
}
5355

5456
func newHost(ctx context.Context, seed int64, ds datastore.Batching) (host.Host, error) {
55-
// connmgr, err := connmgr.NewConnManager(
56-
// 50,
57-
// 500,
58-
// connmgr.WithGracePeriod(time.Minute),
59-
// )
60-
// if err != nil {
61-
// common.Logger.Error("Error while creating connection manager: %v", err)
62-
// }
6357
var err error
58+
// Connection manager: maintain a larger pool of connections so we can exceed
59+
// the pubsub mesh degree and keep more peers around.
60+
cm, err := connmgr.NewConnManager(
61+
100, // Low watermark
62+
800, // High watermark
63+
connmgr.WithGracePeriod(5*time.Minute),
64+
)
65+
if err != nil {
66+
common.Logger.Error("Error while creating connection manager: ", err)
67+
}
6468
var priv crypto.PrivKey
6569
// try to load the private key from file
6670
if seed == 0 {
@@ -103,9 +107,9 @@ func newHost(ctx context.Context, seed int64, ds datastore.Batching) (host.Host,
103107
System: systemLimits,
104108
// Keep default peer limits but increase them slightly
105109
PeerDefault: rcmgr.ResourceLimits{
106-
ConnsInbound: 16, // Allow more connections per peer
107-
ConnsOutbound: 16,
108-
Conns: 32,
110+
ConnsInbound: 512, // Allow more connections per peer
111+
ConnsOutbound: 512,
112+
Conns: 1024,
109113
},
110114
}.Build(limits)
111115

@@ -119,7 +123,7 @@ func newHost(ctx context.Context, seed int64, ds datastore.Batching) (host.Host,
119123
libp2p.DefaultTransports,
120124
libp2p.Identity(priv),
121125
libp2p.ResourceManager(mgr), // Use our custom resource manager
122-
// libp2p.ConnectionManager(connmgr),
126+
libp2p.ConnectionManager(cm),
123127
libp2p.NATPortMap(),
124128
libp2p.ListenAddrStrings(
125129
"/ip4/0.0.0.0/tcp/"+viper.GetString("tcpport"),
@@ -149,9 +153,45 @@ func newHost(ctx context.Context, seed int64, ds datastore.Batching) (host.Host,
149153
common.Logger.Info("Connected to peer: ", c.RemotePeer(), " Total connections: ", len(n.Conns()))
150154
// On (re)connections, re-announce local services
151155
go ReannounceLocalServices()
156+
157+
// Mark peer as connected in node table immediately
158+
go func(pid peer.ID) {
159+
// Avoid updating self
160+
if pid == host.ID() {
161+
return
162+
}
163+
p, err := GetPeerFromTable(pid.String())
164+
if err != nil {
165+
p = Peer{ID: pid.String()}
166+
}
167+
p.Connected = true
168+
p.LastSeen = time.Now().Unix()
169+
if b, e := json.Marshal(p); e == nil {
170+
UpdateNodeTableHook(datastore.NewKey(pid.String()), b)
171+
} else {
172+
common.Logger.Error("Failed to marshal peer on connect: ", e)
173+
}
174+
}(c.RemotePeer())
152175
},
153176
DisconnectedF: func(n network.Network, c network.Conn) {
154177
common.Logger.Info("Disconnected from peer: ", c.RemotePeer(), " Total connections: ", len(n.Conns()))
178+
// Mark peer as disconnected in node table immediately
179+
go func(pid peer.ID) {
180+
if pid == host.ID() {
181+
return
182+
}
183+
p, err := GetPeerFromTable(pid.String())
184+
if err != nil {
185+
p = Peer{ID: pid.String()}
186+
}
187+
p.Connected = false
188+
// keep LastSeen as last known good; do not bump here
189+
if b, e := json.Marshal(p); e == nil {
190+
UpdateNodeTableHook(datastore.NewKey(pid.String()), b)
191+
} else {
192+
common.Logger.Error("Failed to marshal peer on disconnect: ", e)
193+
}
194+
}(c.RemotePeer())
155195
},
156196
})
157197

@@ -247,6 +287,11 @@ func ConnectedBootstraps() []string {
247287
}
248288
}
249289
}
290+
// add myself as bootstrap
291+
myaddr := host.Addrs()[0].String() + "/p2p/" + host.ID().String()
292+
bootstraps = append(bootstraps, myaddr)
293+
// deduplicate
294+
bootstraps = common.DeduplicateStrings(bootstraps)
250295
return bootstraps
251296
}
252297

src/internal/protocol/node_table.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,6 @@ func UpdateNodeTableHook(key ds.Key, value []byte) {
124124
common.ReportError(err, "Error while unmarshalling peer")
125125
// Preserve locally computed connectivity status if we already know this peer
126126
if existing, ok := table[key.String()]; ok {
127-
peer.Connected = existing.Connected
128127
// If LastSeen is missing in the update, keep the existing one
129128
if peer.LastSeen == 0 {
130129
peer.LastSeen = existing.LastSeen

tools/establish_connection.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,15 @@
55
-H 'Authorization: Bearer YOUR_API_KEY' \
66
-H 'Content-Type: application/json' \
77
-d '{
8-
"model": "Qwen/Qwen3-32B",
8+
"model": "swissai/apertus3-70b-15T-sft",
99
"messages": [
1010
{ "role": "system", "content": "You are a helpful assistant." },
1111
{ "role": "user", "content": "What is the capital of France?" }
1212
],
1313
"temperature": 0.7
1414
}'"""
1515

16-
address = "QmNNRmz2etg76yFhc15cqvVPEKjCywSLSRFiN3brzYEV6u"
16+
address = "QmSNB58JK6TvpWpKqAQMJSmvZbzWLy5Qp9jkT8pNp9cJf5"
1717
cmd = command.replace("<address>", address)
1818
print(cmd)
1919
# Use subprocess to avoid shell interpretation issues

0 commit comments

Comments
 (0)