diff --git a/cli/client.go b/cli/client.go index 736c25fbced..27048c9d0ca 100644 --- a/cli/client.go +++ b/cli/client.go @@ -1434,6 +1434,7 @@ func RobotsPartTunnelAction(c *cli.Context, args robotsPartTunnelArgs) error { func tunnelTraffic(ctx *cli.Context, robotClient *client.RobotClient, local, dest int) error { // don't block tunnel attempt if ListTunnels fails in any way - it may be unimplemented. // TODO: early return if ListTunnels fails. + startTime := time.Now() if tunnels, err := robotClient.ListTunnels(ctx.Context); err == nil { allowed := false for _, t := range tunnels { @@ -1455,7 +1456,14 @@ func tunnelTraffic(ctx *cli.Context, robotClient *client.RobotClient, local, des if err != nil { return fmt.Errorf("failed to create listener %w", err) } - infof(ctx.App.Writer, "tunneling connections from local port %v to destination port %v on machine part...", local, dest) + + infof( + ctx.App.Writer, + "tunneling connections from local port %v to destination port %v on machine part with measured network latency %.2fms...", + local, + dest, + float32(time.Since(startTime).Nanoseconds()/2000)/1000, + ) defer func() { if err := li.Close(); err != nil { warningf(ctx.App.ErrWriter, "error closing listener: %s", err) @@ -1494,7 +1502,7 @@ func (c *viamClient) robotPartTunnel(cCtx *cli.Context, args robotsPartTunnelArg partStr := args.Part // Create logger based on presence of debugFlag. - logger := logging.FromZapCompatible(zap.NewNop().Sugar()) + logger := logging.NewLogger("cli-robot-client") globalArgs, err := getGlobalArgs(cCtx) if err != nil { return err diff --git a/go.mod b/go.mod index 835a4fde5d7..e3565098159 100644 --- a/go.mod +++ b/go.mod @@ -438,3 +438,5 @@ require ( github.com/ziutek/mymysql v1.5.4 // indirect golang.org/x/exp v0.0.0-20240904232852-e7e105dedf7e ) + +replace go.viam.com/utils => github.com/benjirewis/goutils v0.0.0-20250605140908-da1844ed8818 diff --git a/go.sum b/go.sum index b41ffe07f96..c104277767a 100644 --- a/go.sum +++ b/go.sum @@ -169,6 +169,8 @@ github.com/aymanbagabas/go-osc52/v2 v2.0.1/go.mod h1:uYgXzlJ7ZpABp8OJ+exZzJJhRNQ github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/benbjohnson/clock v1.3.5 h1:VvXlSJBzZpA/zum6Sj74hxwYI2DIxRWuNIoXAzHZz5o= github.com/benbjohnson/clock v1.3.5/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= +github.com/benjirewis/goutils v0.0.0-20250605140908-da1844ed8818 h1:RTOJDkmA+01j+wau5TTGRgGX77xlaG0GjpXIDD64DTM= +github.com/benjirewis/goutils v0.0.0-20250605140908-da1844ed8818/go.mod h1:AjmP/wKHvMkBBW55HJecD3I3Ci10vZGFv7RzFfJ6+mw= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= @@ -1538,8 +1540,6 @@ go.viam.com/api v0.1.438 h1:5tuM4b1Q2S1f/E8sgfbtivM/FuQYbTlf925o4QsCcck= go.viam.com/api v0.1.438/go.mod h1:gwJriv6EVWe97uFzzzWjzP3NPfpCrKtRAdWtYglUpqs= go.viam.com/test v1.2.4 h1:JYgZhsuGAQ8sL9jWkziAXN9VJJiKbjoi9BsO33TW3ug= go.viam.com/test v1.2.4/go.mod h1:zI2xzosHdqXAJ/kFqcN+OIF78kQuTV2nIhGZ8EzvaJI= -go.viam.com/utils v0.1.143 h1:QyvniQw6+WDovfyhMzbDO6D2vPkiQpKyDkjDPu1OaE0= -go.viam.com/utils v0.1.143/go.mod h1:AjmP/wKHvMkBBW55HJecD3I3Ci10vZGFv7RzFfJ6+mw= go4.org/unsafe/assume-no-moving-gc v0.0.0-20230525183740-e7c30c78aeb2 h1:WJhcL4p+YeDxmZWg141nRm7XC8IDmhz7lk5GpadO1Sg= go4.org/unsafe/assume-no-moving-gc v0.0.0-20230525183740-e7c30c78aeb2/go.mod h1:FftLjUGFEDu5k8lt0ddY+HcrH/qU/0qk+H8j9/nTl3E= gocv.io/x/gocv v0.25.0/go.mod h1:Rar2PS6DV+T4FL+PM535EImD/h13hGVaHhnCu1xarBs= diff --git a/robot/client/client.go b/robot/client/client.go index 394b34e16a0..c9bbeeaf5a3 100644 --- a/robot/client/client.go +++ b/robot/client/client.go @@ -3,9 +3,11 @@ package client import ( "context" + "encoding/binary" "errors" "fmt" "io" + "math" "strings" "sync" "sync/atomic" @@ -1190,6 +1192,33 @@ func (rc *RobotClient) Tunnel(ctx context.Context, conn io.ReadWriteCloser, dest return err } + var diff float64 + for range 5 { + sendTime := time.Now().UnixNano() + bytes := make([]byte, 8) + binary.LittleEndian.PutUint64(bytes, uint64(sendTime)) + client.Send(&pb.TunnelRequest{Data: bytes}) + + timeMsg, err := client.Recv() + if err != nil { + return fmt.Errorf("failed to receive message from stream: %w", err) + } + srvTime := int64(binary.LittleEndian.Uint64(timeMsg.Data)) + recTime := time.Now().UnixNano() + rTT := float64(recTime - sendTime) + tripTime := rTT / 2 + expectedSrvTime := float64(sendTime) + tripTime + difference := float64(srvTime) - expectedSrvTime + diff += difference / 1000000 + } + sendTime := time.Now().UnixNano() + bytes := make([]byte, 8) + binary.LittleEndian.PutUint64(bytes, uint64(sendTime)) + + client.Send(&pb.TunnelRequest{Data: bytes}) + drift := diff / float64(5) + rc.logger.CInfof(ctx, "measured clock drift of %.2f ms", drift) + if err := client.Send(&pb.TunnelRequest{ DestinationPort: uint32(dest), }); err != nil { @@ -1225,17 +1254,97 @@ func (rc *RobotClient) Tunnel(ctx context.Context, conn io.ReadWriteCloser, dest timerMu.Unlock() wg.Done() }() - // a max of 32kb will be sent per message (based on io.Copy's default buffer size) - sendFunc := func(data []byte) error { return client.Send(&pb.TunnelRequest{Data: data}) } + // a max of 1MB will be sent per message + sendFunc := func(data []byte) error { + sendTime := time.Now().UnixNano() + b := make([]byte, 8) + binary.LittleEndian.PutUint64(b, uint64(sendTime)) + b = append(b, data...) + return client.Send(&pb.TunnelRequest{Data: b}) + } readerSenderErr = tunnel.ReaderSenderLoop(ctx, conn, sendFunc, connClosed, rc.logger.WithFields("loop", "reader/sender")) }) + var statsMu sync.Mutex + type stats struct { + latencies []int64 + max int64 + min int64 + size []int64 + max_size int64 + min_size int64 + } + stat := stats{ + latencies: make([]int64, 0), + max: math.MinInt64, + min: math.MaxInt64, + size: make([]int64, 0), + max_size: math.MinInt64, + min_size: math.MaxInt64, + } + recvFunc := func() ([]byte, error) { resp, err := client.Recv() if err != nil { return nil, err } - return resp.Data, nil + sentTime := resp.Data[:8] + sentMilli := int64(binary.LittleEndian.Uint64(sentTime)) + recTime := time.Now().UnixNano() + latency := recTime - sentMilli + statsMu.Lock() + if stat.max < latency { + stat.max = latency + } + if stat.min > latency { + stat.min = latency + } + stat.latencies = append(stat.latencies, latency) + + pk_size := int64(len(resp.Data[8:])) + if stat.max_size < pk_size { + stat.max_size = pk_size + } + if stat.min_size > pk_size { + stat.min_size = pk_size + } + stat.size = append(stat.size, pk_size) + if len(stat.latencies) == 10 { + var total int64 + for _, lat := range stat.latencies { + total += lat + } + mean := float64(total) / float64(10) + var total_size int64 + for _, s := range stat.size { + total_size += s + } + avg_size := float64(total_size) / float64(10) + rc.logger. + CInfow(client.Context(), + "latency over last 10 messages (ms)", + "avg", fmt.Sprintf("%.2f", (mean/1000000)+drift), + "min", fmt.Sprintf("%.2f", float64(stat.min/1000000)+drift), + "max", fmt.Sprintf("%.2f", float64(stat.max/1000000)+drift), + ) + rc.logger. + CInfow(client.Context(), + "size over last 10 messages (bytes)", + "avg", fmt.Sprintf("%.2f", avg_size), + "min", fmt.Sprintf("%d", stat.min_size), + "max", fmt.Sprintf("%d", stat.max_size), + ) + stat = stats{ + latencies: make([]int64, 0), + max: math.MinInt64, + min: math.MaxInt64, + size: make([]int64, 0), + max_size: math.MinInt64, + min_size: math.MaxInt64, + } + } + statsMu.Unlock() + return resp.Data[8:], nil } recvWriterErr := tunnel.RecvWriterLoop(ctx, recvFunc, conn, rsDone, rc.logger.WithFields("loop", "recv/writer")) timerMu.Lock() diff --git a/robot/server/server.go b/robot/server/server.go index aac7f3a74da..a875185dbaa 100644 --- a/robot/server/server.go +++ b/robot/server/server.go @@ -6,8 +6,10 @@ package server import ( "bytes" "context" + "encoding/binary" "errors" "fmt" + "math" "net" "strconv" "sync" @@ -70,6 +72,39 @@ func (s *Server) Tunnel(srv pb.RobotService_TunnelServer) error { return fmt.Errorf("failed to receive first message from stream: %w", err) } + var diff float64 + for range 5 { + sendTime := time.Now().UnixNano() + bytes := make([]byte, 8) + binary.LittleEndian.PutUint64(bytes, uint64(sendTime)) + srv.Send(&pb.TunnelResponse{Data: bytes}) + + timeMsg, err := srv.Recv() + if err != nil { + return fmt.Errorf("failed to receive message from stream: %w", err) + } + clientTime := int64(binary.LittleEndian.Uint64(timeMsg.Data)) + recTime := time.Now().UnixNano() + rTT := float64(recTime - sendTime) + tripTime := rTT / 2 + expectedClientTime := float64(sendTime) + tripTime + difference := float64(clientTime) - expectedClientTime + diff += difference / 1000000 + } + sendTime := time.Now().UnixNano() + bytes := make([]byte, 8) + binary.LittleEndian.PutUint64(bytes, uint64(sendTime)) + srv.Send(&pb.TunnelResponse{Data: bytes}) + + drift := diff / float64(5) + s.robot.Logger().CInfof(srv.Context(), "measured drift of %.2f ms", drift) + + // regular operation + req, err = srv.Recv() + if err != nil { + return fmt.Errorf("failed to receive first message from stream: %w", err) + } + dialTimeout := defaultTunnelConnectionTimeout // Ensure destination port is available; otherwise error. @@ -113,16 +148,66 @@ func (s *Server) Tunnel(srv pb.RobotService_TunnelServer) error { close(rsDone) wg.Done() }() - // a max of 32kb will be sent per message (based on io.Copy's default buffer size) - sendFunc := func(data []byte) error { return srv.Send(&pb.TunnelResponse{Data: data}) } + // a max of 1MB will be sent per message + sendFunc := func(data []byte) error { + sendTime := time.Now().UnixNano() + bytes := make([]byte, 8) + binary.LittleEndian.PutUint64(bytes, uint64(sendTime)) + bytes = append(bytes, data...) + return srv.Send(&pb.TunnelResponse{Data: bytes}) + } readerSenderErr = tunnel.ReaderSenderLoop(srv.Context(), conn, sendFunc, connClosed, s.robot.Logger().WithFields("loop", "reader/sender")) }) + + var statsMu sync.Mutex + type stats struct { + latencies []int64 + max int64 + min int64 + } + stat := stats{ + latencies: make([]int64, 0), + max: math.MinInt64, + min: math.MaxInt64, + } recvFunc := func() ([]byte, error) { req, err := srv.Recv() if err != nil { return nil, err } - return req.Data, nil + sentTime := req.Data[:8] + sentNano := int64(binary.LittleEndian.Uint64(sentTime)) + recTime := time.Now().UnixNano() + latency := recTime - sentNano + statsMu.Lock() + if stat.max < latency { + stat.max = latency + } + if stat.min > latency { + stat.min = latency + } + stat.latencies = append(stat.latencies, latency) + if len(stat.latencies) == 10 { + var total int64 + for _, lat := range stat.latencies { + total += lat + } + mean := float64(total) / float64(10) + s.robot.Logger(). + CInfow(srv.Context(), + "latency over last 10 messages (ms)", + "avg", fmt.Sprintf("%.2f", (mean/1000000)+drift), + "min", fmt.Sprintf("%.2f", float64(stat.min/1000000)+drift), + "max", fmt.Sprintf("%.2f", float64(stat.max/1000000)+drift), + ) + stat = stats{ + latencies: make([]int64, 0), + max: math.MinInt64, + min: math.MaxInt64, + } + } + statsMu.Unlock() + return req.Data[8:], nil } recvWriterErr := tunnel.RecvWriterLoop(srv.Context(), recvFunc, conn, rsDone, s.robot.Logger().WithFields("loop", "recv/writer")) // close the connection to unblock the read