Skip to content

[testing]additional stats #5014

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions cli/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1434,6 +1434,7 @@ func RobotsPartTunnelAction(c *cli.Context, args robotsPartTunnelArgs) error {
func tunnelTraffic(ctx *cli.Context, robotClient *client.RobotClient, local, dest int) error {
// don't block tunnel attempt if ListTunnels fails in any way - it may be unimplemented.
// TODO: early return if ListTunnels fails.
startTime := time.Now()
if tunnels, err := robotClient.ListTunnels(ctx.Context); err == nil {
allowed := false
for _, t := range tunnels {
Expand All @@ -1455,7 +1456,14 @@ func tunnelTraffic(ctx *cli.Context, robotClient *client.RobotClient, local, des
if err != nil {
return fmt.Errorf("failed to create listener %w", err)
}
infof(ctx.App.Writer, "tunneling connections from local port %v to destination port %v on machine part...", local, dest)

infof(
ctx.App.Writer,
"tunneling connections from local port %v to destination port %v on machine part with measured network latency %.2fms...",
local,
dest,
float32(time.Since(startTime).Nanoseconds()/2000)/1000,
)
defer func() {
if err := li.Close(); err != nil {
warningf(ctx.App.ErrWriter, "error closing listener: %s", err)
Expand Down Expand Up @@ -1494,7 +1502,7 @@ func (c *viamClient) robotPartTunnel(cCtx *cli.Context, args robotsPartTunnelArg
partStr := args.Part

// Create logger based on presence of debugFlag.
logger := logging.FromZapCompatible(zap.NewNop().Sugar())
logger := logging.NewLogger("cli-robot-client")
globalArgs, err := getGlobalArgs(cCtx)
if err != nil {
return err
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -438,3 +438,5 @@ require (
github.com/ziutek/mymysql v1.5.4 // indirect
golang.org/x/exp v0.0.0-20240904232852-e7e105dedf7e
)

replace go.viam.com/utils => github.com/benjirewis/goutils v0.0.0-20250605140908-da1844ed8818
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ github.com/aymanbagabas/go-osc52/v2 v2.0.1/go.mod h1:uYgXzlJ7ZpABp8OJ+exZzJJhRNQ
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/benbjohnson/clock v1.3.5 h1:VvXlSJBzZpA/zum6Sj74hxwYI2DIxRWuNIoXAzHZz5o=
github.com/benbjohnson/clock v1.3.5/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/benjirewis/goutils v0.0.0-20250605140908-da1844ed8818 h1:RTOJDkmA+01j+wau5TTGRgGX77xlaG0GjpXIDD64DTM=
github.com/benjirewis/goutils v0.0.0-20250605140908-da1844ed8818/go.mod h1:AjmP/wKHvMkBBW55HJecD3I3Ci10vZGFv7RzFfJ6+mw=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
Expand Down Expand Up @@ -1538,8 +1540,6 @@ go.viam.com/api v0.1.438 h1:5tuM4b1Q2S1f/E8sgfbtivM/FuQYbTlf925o4QsCcck=
go.viam.com/api v0.1.438/go.mod h1:gwJriv6EVWe97uFzzzWjzP3NPfpCrKtRAdWtYglUpqs=
go.viam.com/test v1.2.4 h1:JYgZhsuGAQ8sL9jWkziAXN9VJJiKbjoi9BsO33TW3ug=
go.viam.com/test v1.2.4/go.mod h1:zI2xzosHdqXAJ/kFqcN+OIF78kQuTV2nIhGZ8EzvaJI=
go.viam.com/utils v0.1.143 h1:QyvniQw6+WDovfyhMzbDO6D2vPkiQpKyDkjDPu1OaE0=
go.viam.com/utils v0.1.143/go.mod h1:AjmP/wKHvMkBBW55HJecD3I3Ci10vZGFv7RzFfJ6+mw=
go4.org/unsafe/assume-no-moving-gc v0.0.0-20230525183740-e7c30c78aeb2 h1:WJhcL4p+YeDxmZWg141nRm7XC8IDmhz7lk5GpadO1Sg=
go4.org/unsafe/assume-no-moving-gc v0.0.0-20230525183740-e7c30c78aeb2/go.mod h1:FftLjUGFEDu5k8lt0ddY+HcrH/qU/0qk+H8j9/nTl3E=
gocv.io/x/gocv v0.25.0/go.mod h1:Rar2PS6DV+T4FL+PM535EImD/h13hGVaHhnCu1xarBs=
Expand Down
115 changes: 112 additions & 3 deletions robot/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package client

import (
"context"
"encoding/binary"
"errors"
"fmt"
"io"
"math"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -1190,6 +1192,33 @@ func (rc *RobotClient) Tunnel(ctx context.Context, conn io.ReadWriteCloser, dest
return err
}

var diff float64
for range 5 {
sendTime := time.Now().UnixNano()
bytes := make([]byte, 8)
binary.LittleEndian.PutUint64(bytes, uint64(sendTime))
client.Send(&pb.TunnelRequest{Data: bytes})

timeMsg, err := client.Recv()
if err != nil {
return fmt.Errorf("failed to receive message from stream: %w", err)
}
srvTime := int64(binary.LittleEndian.Uint64(timeMsg.Data))
recTime := time.Now().UnixNano()
rTT := float64(recTime - sendTime)
tripTime := rTT / 2
expectedSrvTime := float64(sendTime) + tripTime
difference := float64(srvTime) - expectedSrvTime
diff += difference / 1000000
}
sendTime := time.Now().UnixNano()
bytes := make([]byte, 8)
binary.LittleEndian.PutUint64(bytes, uint64(sendTime))

client.Send(&pb.TunnelRequest{Data: bytes})
drift := diff / float64(5)
rc.logger.CInfof(ctx, "measured clock drift of %.2f ms", drift)

if err := client.Send(&pb.TunnelRequest{
DestinationPort: uint32(dest),
}); err != nil {
Expand Down Expand Up @@ -1225,17 +1254,97 @@ func (rc *RobotClient) Tunnel(ctx context.Context, conn io.ReadWriteCloser, dest
timerMu.Unlock()
wg.Done()
}()
// a max of 32kb will be sent per message (based on io.Copy's default buffer size)
sendFunc := func(data []byte) error { return client.Send(&pb.TunnelRequest{Data: data}) }
// a max of 1MB will be sent per message
sendFunc := func(data []byte) error {
sendTime := time.Now().UnixNano()
b := make([]byte, 8)
binary.LittleEndian.PutUint64(b, uint64(sendTime))
b = append(b, data...)
return client.Send(&pb.TunnelRequest{Data: b})
}
readerSenderErr = tunnel.ReaderSenderLoop(ctx, conn, sendFunc, connClosed, rc.logger.WithFields("loop", "reader/sender"))
})

var statsMu sync.Mutex
type stats struct {
latencies []int64
max int64
min int64
size []int64
max_size int64
min_size int64
}
stat := stats{
latencies: make([]int64, 0),
max: math.MinInt64,
min: math.MaxInt64,
size: make([]int64, 0),
max_size: math.MinInt64,
min_size: math.MaxInt64,
}

recvFunc := func() ([]byte, error) {
resp, err := client.Recv()
if err != nil {
return nil, err
}
return resp.Data, nil
sentTime := resp.Data[:8]
sentMilli := int64(binary.LittleEndian.Uint64(sentTime))
recTime := time.Now().UnixNano()
latency := recTime - sentMilli
statsMu.Lock()
if stat.max < latency {
stat.max = latency
}
if stat.min > latency {
stat.min = latency
}
stat.latencies = append(stat.latencies, latency)

pk_size := int64(len(resp.Data[8:]))
if stat.max_size < pk_size {
stat.max_size = pk_size
}
if stat.min_size > pk_size {
stat.min_size = pk_size
}
stat.size = append(stat.size, pk_size)
if len(stat.latencies) == 10 {
var total int64
for _, lat := range stat.latencies {
total += lat
}
mean := float64(total) / float64(10)
var total_size int64
for _, s := range stat.size {
total_size += s
}
avg_size := float64(total_size) / float64(10)
rc.logger.
CInfow(client.Context(),
"latency over last 10 messages (ms)",
"avg", fmt.Sprintf("%.2f", (mean/1000000)+drift),
"min", fmt.Sprintf("%.2f", float64(stat.min/1000000)+drift),
"max", fmt.Sprintf("%.2f", float64(stat.max/1000000)+drift),
)
rc.logger.
CInfow(client.Context(),
"size over last 10 messages (bytes)",
"avg", fmt.Sprintf("%.2f", avg_size),
"min", fmt.Sprintf("%d", stat.min_size),
"max", fmt.Sprintf("%d", stat.max_size),
)
stat = stats{
latencies: make([]int64, 0),
max: math.MinInt64,
min: math.MaxInt64,
size: make([]int64, 0),
max_size: math.MinInt64,
min_size: math.MaxInt64,
}
}
statsMu.Unlock()
return resp.Data[8:], nil
}
recvWriterErr := tunnel.RecvWriterLoop(ctx, recvFunc, conn, rsDone, rc.logger.WithFields("loop", "recv/writer"))
timerMu.Lock()
Expand Down
91 changes: 88 additions & 3 deletions robot/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ package server
import (
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
"math"
"net"
"strconv"
"sync"
Expand Down Expand Up @@ -70,6 +72,39 @@ func (s *Server) Tunnel(srv pb.RobotService_TunnelServer) error {
return fmt.Errorf("failed to receive first message from stream: %w", err)
}

var diff float64
for range 5 {
sendTime := time.Now().UnixNano()
bytes := make([]byte, 8)
binary.LittleEndian.PutUint64(bytes, uint64(sendTime))
srv.Send(&pb.TunnelResponse{Data: bytes})

timeMsg, err := srv.Recv()
if err != nil {
return fmt.Errorf("failed to receive message from stream: %w", err)
}
clientTime := int64(binary.LittleEndian.Uint64(timeMsg.Data))
recTime := time.Now().UnixNano()
rTT := float64(recTime - sendTime)
tripTime := rTT / 2
expectedClientTime := float64(sendTime) + tripTime
difference := float64(clientTime) - expectedClientTime
diff += difference / 1000000
}
sendTime := time.Now().UnixNano()
bytes := make([]byte, 8)
binary.LittleEndian.PutUint64(bytes, uint64(sendTime))
srv.Send(&pb.TunnelResponse{Data: bytes})

drift := diff / float64(5)
s.robot.Logger().CInfof(srv.Context(), "measured drift of %.2f ms", drift)

// regular operation
req, err = srv.Recv()
if err != nil {
return fmt.Errorf("failed to receive first message from stream: %w", err)
}

dialTimeout := defaultTunnelConnectionTimeout

// Ensure destination port is available; otherwise error.
Expand Down Expand Up @@ -113,16 +148,66 @@ func (s *Server) Tunnel(srv pb.RobotService_TunnelServer) error {
close(rsDone)
wg.Done()
}()
// a max of 32kb will be sent per message (based on io.Copy's default buffer size)
sendFunc := func(data []byte) error { return srv.Send(&pb.TunnelResponse{Data: data}) }
// a max of 1MB will be sent per message
sendFunc := func(data []byte) error {
sendTime := time.Now().UnixNano()
bytes := make([]byte, 8)
binary.LittleEndian.PutUint64(bytes, uint64(sendTime))
bytes = append(bytes, data...)
return srv.Send(&pb.TunnelResponse{Data: bytes})
}
readerSenderErr = tunnel.ReaderSenderLoop(srv.Context(), conn, sendFunc, connClosed, s.robot.Logger().WithFields("loop", "reader/sender"))
})

var statsMu sync.Mutex
type stats struct {
latencies []int64
max int64
min int64
}
stat := stats{
latencies: make([]int64, 0),
max: math.MinInt64,
min: math.MaxInt64,
}
recvFunc := func() ([]byte, error) {
req, err := srv.Recv()
if err != nil {
return nil, err
}
return req.Data, nil
sentTime := req.Data[:8]
sentNano := int64(binary.LittleEndian.Uint64(sentTime))
recTime := time.Now().UnixNano()
latency := recTime - sentNano
statsMu.Lock()
if stat.max < latency {
stat.max = latency
}
if stat.min > latency {
stat.min = latency
}
stat.latencies = append(stat.latencies, latency)
if len(stat.latencies) == 10 {
var total int64
for _, lat := range stat.latencies {
total += lat
}
mean := float64(total) / float64(10)
s.robot.Logger().
CInfow(srv.Context(),
"latency over last 10 messages (ms)",
"avg", fmt.Sprintf("%.2f", (mean/1000000)+drift),
"min", fmt.Sprintf("%.2f", float64(stat.min/1000000)+drift),
"max", fmt.Sprintf("%.2f", float64(stat.max/1000000)+drift),
)
stat = stats{
latencies: make([]int64, 0),
max: math.MinInt64,
min: math.MaxInt64,
}
}
statsMu.Unlock()
return req.Data[8:], nil
}
recvWriterErr := tunnel.RecvWriterLoop(srv.Context(), recvFunc, conn, rsDone, s.robot.Logger().WithFields("loop", "recv/writer"))
// close the connection to unblock the read
Expand Down
Loading