diff --git a/data/result.go b/data/result.go index 36bb1571..ad2eaa0e 100644 --- a/data/result.go +++ b/data/result.go @@ -45,4 +45,5 @@ type NDTResult struct { // ndt7 Upload *model.ArchivalData `json:",omitempty"` Download *model.ArchivalData `json:",omitempty"` + Ping *model.ArchivalData `json:",omitempty"` } diff --git a/html/ndt7-ping.js b/html/ndt7-ping.js new file mode 100644 index 00000000..b2ed6c04 --- /dev/null +++ b/html/ndt7-ping.js @@ -0,0 +1,22 @@ +/* jshint esversion: 6, asi: true, worker: true */ +// WebWorker that runs the ndt7 ping test +onmessage = function (ev) { + 'use strict' + let url = new URL(ev.data.href) + url.protocol = (url.protocol === 'https:') ? 'wss:' : 'ws:' + url.pathname = '/ndt/v7/ping' + const sock = new WebSocket(url.toString(), 'net.measurementlab.ndt.v7') + sock.onclose = function () { + postMessage(null) + } + sock.onopen = function () { + sock.onmessage = function (ev) { + if (!(ev.data instanceof Blob)) { + let m = JSON.parse(ev.data) + m.Origin = 'server' + m.Test = 'ping' + postMessage(m) + } + } + } +} diff --git a/html/ndt7.html b/html/ndt7.html index e5e18742..548355c1 100644 --- a/html/ndt7.html +++ b/html/ndt7.html @@ -24,8 +24,10 @@
+
[Ping]
[Download]
[Upload]
+
diff --git a/ndt-server.go b/ndt-server.go index e9790aed..48706eab 100644 --- a/ndt-server.go +++ b/ndt-server.go @@ -159,6 +159,7 @@ func main() { } ndt7Mux.Handle(spec.DownloadURLPath, http.HandlerFunc(ndt7Handler.Download)) ndt7Mux.Handle(spec.UploadURLPath, http.HandlerFunc(ndt7Handler.Upload)) + ndt7Mux.Handle(spec.PingURLPath, http.HandlerFunc(ndt7Handler.Ping)) ndt7Server := &http.Server{ Addr: *ndt7Addr, Handler: logging.MakeAccessLogHandler(ndt7Mux), diff --git a/ndt7/download/download.go b/ndt7/download/download.go index 832d7a21..6ae0e741 100644 --- a/ndt7/download/download.go +++ b/ndt7/download/download.go @@ -3,6 +3,7 @@ package download import ( "context" + "time" "github.com/gorilla/websocket" "github.com/m-lab/ndt-server/ndt7/download/sender" @@ -15,13 +16,15 @@ import ( // Do implements the download subtest. The ctx argument is the parent // context for the subtest. The conn argument is the open WebSocket // connection. The resultfp argument is the file where to save results. Both -// arguments are owned by the caller of this function. -func Do(ctx context.Context, conn *websocket.Conn, resultfp *results.File) { +// arguments are owned by the caller of this function. The start argument is +// the test start time used to calculate ElapsedTime and deadlines. +func Do(ctx context.Context, conn *websocket.Conn, resultfp *results.File, start time.Time) { // Implementation note: use child context so that, if we cannot save the // results in the loop below, we terminate the goroutines early wholectx, cancel := context.WithCancel(ctx) defer cancel() - senderch := sender.Start(conn, measurer.Start(wholectx, conn, resultfp.Data.UUID)) - receiverch := receiver.StartDownloadReceiver(wholectx, conn) + senderch := sender.Start(conn, measurer.Start(wholectx, conn, resultfp.Data.UUID, start), start) + receiverch := receiver.StartDownloadReceiver(wholectx, conn, start) + saver.SaveAll(resultfp, senderch, receiverch) } diff --git a/ndt7/download/sender/sender.go b/ndt7/download/sender/sender.go index 5c4efc03..9f9f86cc 100644 --- a/ndt7/download/sender/sender.go +++ b/ndt7/download/sender/sender.go @@ -9,7 +9,7 @@ import ( "github.com/m-lab/ndt-server/logging" "github.com/m-lab/ndt-server/ndt7/closer" "github.com/m-lab/ndt-server/ndt7/model" - "github.com/m-lab/ndt-server/ndt7/ping" + "github.com/m-lab/ndt-server/ndt7/ping/message" "github.com/m-lab/ndt-server/ndt7/spec" ) @@ -22,7 +22,7 @@ func makePreparedMessage(size int) (*websocket.PreparedMessage, error) { return websocket.NewPreparedMessage(websocket.BinaryMessage, data) } -func loop(conn *websocket.Conn, src <-chan model.Measurement, dst chan<- model.Measurement) { +func loop(conn *websocket.Conn, src <-chan model.Measurement, dst chan<- model.Measurement, start time.Time) { logging.Logger.Debug("sender: start") defer logging.Logger.Debug("sender: stop") defer close(dst) @@ -38,12 +38,18 @@ func loop(conn *websocket.Conn, src <-chan model.Measurement, dst chan<- model.M logging.Logger.WithError(err).Warn("sender: makePreparedMessage failed") return } - deadline := time.Now().Add(spec.MaxRuntime) + deadline := start.Add(spec.MaxRuntime) err = conn.SetWriteDeadline(deadline) // Liveness! if err != nil { logging.Logger.WithError(err).Warn("sender: conn.SetWriteDeadline failed") return } + // One RTT sample is taken before flooding the connection with data. + // That sample is not affected by HOL, so it has additional value and is treated specially. + if err := message.SendTicks(conn, start, deadline); err != nil { + logging.Logger.WithError(err).Warn("sender: ping.message.SendTicks failed") + return + } var totalSent int64 for { select { @@ -57,8 +63,8 @@ func loop(conn *websocket.Conn, src <-chan model.Measurement, dst chan<- model.M return } dst <- m // Liveness: this is blocking - if err := ping.SendTicks(conn, deadline); err != nil { - logging.Logger.WithError(err).Warn("sender: ping.SendTicks failed") + if err := message.SendTicks(conn, start, deadline); err != nil { + logging.Logger.WithError(err).Warn("sender: ping.message.SendTicks failed") return } default: @@ -99,9 +105,9 @@ func loop(conn *websocket.Conn, src <-chan model.Measurement, dst chan<- model.M // Liveness guarantee: the sender will not be stuck sending for more then // the MaxRuntime of the subtest, provided that the consumer will // continue reading from the returned channel. This is enforced by -// setting the write deadline to Time.Now() + MaxRuntime. -func Start(conn *websocket.Conn, src <-chan model.Measurement) <-chan model.Measurement { +// setting the write deadline to |start| + MaxRuntime. +func Start(conn *websocket.Conn, src <-chan model.Measurement, start time.Time) <-chan model.Measurement { dst := make(chan model.Measurement) - go loop(conn, src, dst) + go loop(conn, src, dst, start) return dst } diff --git a/ndt7/handler/handler.go b/ndt7/handler/handler.go index 0d7a522f..e6288259 100644 --- a/ndt7/handler/handler.go +++ b/ndt7/handler/handler.go @@ -16,6 +16,7 @@ import ( "github.com/m-lab/ndt-server/ndt7/results" "github.com/m-lab/ndt-server/ndt7/spec" "github.com/m-lab/ndt-server/ndt7/upload" + "github.com/m-lab/ndt-server/ndt7/ping" "github.com/m-lab/ndt-server/version" ) @@ -36,14 +37,15 @@ func warnAndClose(writer http.ResponseWriter, message string) { // testerFunc is the function implementing a subtest. The first argument // is the subtest context. The second argument is the connected websocket. The // third argument is the open file where to write results. This function does -// not own the second or the third argument. -type testerFunc = func(context.Context, *websocket.Conn, *results.File) +// not own the second or the third argument. The fourth argument is the base +// start time of the test. +type testerFunc = func(context.Context, *websocket.Conn, *results.File, time.Time) // downloadOrUpload implements both download and upload. The writer argument // is the HTTP response writer. The request argument is the HTTP request -// that we received. The kind argument must be spec.SubtestDownload or -// spec.SubtestUpload. The tester is a function actually implementing the -// requested ndt7 subtest. +// that we received. The kind argument must be spec.SubtestDownload, +// spec.SubtestUpload, or SubtestPing. The tester is a function actually +// implementing the requested ndt7 subtest. func (h Handler) downloadOrUpload(writer http.ResponseWriter, request *http.Request, kind spec.SubtestKind, tester testerFunc) { logging.Logger.Debug("downloadOrUpload: upgrading to WebSockets") if request.Header.Get("Sec-WebSocket-Protocol") != spec.SecWebSocketProtocol { @@ -106,6 +108,8 @@ func (h Handler) downloadOrUpload(writer http.ResponseWriter, request *http.Requ result.Download = resultfp.Data } else if kind == spec.SubtestUpload { result.Upload = resultfp.Data + } else if kind == spec.SubtestPing { + result.Ping = resultfp.Data } else { logging.Logger.Warn(string(kind) + ": data not saved") } @@ -114,7 +118,7 @@ func (h Handler) downloadOrUpload(writer http.ResponseWriter, request *http.Requ } warnonerror.Close(resultfp, string(kind)+": ignoring resultfp.Close error") }() - tester(request.Context(), conn, resultfp) + tester(request.Context(), conn, resultfp, result.StartTime) } // Download handles the download subtest. @@ -126,3 +130,8 @@ func (h Handler) Download(writer http.ResponseWriter, request *http.Request) { func (h Handler) Upload(writer http.ResponseWriter, request *http.Request) { h.downloadOrUpload(writer, request, spec.SubtestUpload, upload.Do) } + +// Ping handles the ping subtest. +func (h Handler) Ping(writer http.ResponseWriter, request *http.Request) { + h.downloadOrUpload(writer, request, spec.SubtestPing, ping.Do) +} diff --git a/ndt7/measurer/measurer.go b/ndt7/measurer/measurer.go index da1afa64..fc4c3f2b 100644 --- a/ndt7/measurer/measurer.go +++ b/ndt7/measurer/measurer.go @@ -54,7 +54,7 @@ func measure(measurement *model.Measurement, sockfp *os.File, elapsed time.Durat } } -func loop(ctx context.Context, conn *websocket.Conn, UUID string, dst chan<- model.Measurement) { +func loop(ctx context.Context, conn *websocket.Conn, UUID string, dst chan<- model.Measurement, start time.Time) { logging.Logger.Debug("measurer: start") defer logging.Logger.Debug("measurer: stop") defer close(dst) @@ -66,7 +66,6 @@ func loop(ctx context.Context, conn *websocket.Conn, UUID string, dst chan<- mod return } defer sockfp.Close() - start := time.Now() connectionInfo := &model.ConnectionInfo{ Client: conn.RemoteAddr().String(), Server: conn.LocalAddr().String(), @@ -104,9 +103,9 @@ func loop(ctx context.Context, conn *websocket.Conn, UUID string, dst chan<- mod // a timeout of DefaultRuntime seconds, provided that the consumer // continues reading from the returned channel. func Start( - ctx context.Context, conn *websocket.Conn, UUID string, + ctx context.Context, conn *websocket.Conn, UUID string, start time.Time, ) <-chan model.Measurement { dst := make(chan model.Measurement) - go loop(ctx, conn, UUID, dst) + go loop(ctx, conn, UUID, dst, start) return dst } diff --git a/ndt7/model/measurement.go b/ndt7/model/measurement.go index d0cee8c5..1e3b418a 100644 --- a/ndt7/model/measurement.go +++ b/ndt7/model/measurement.go @@ -8,4 +8,5 @@ type Measurement struct { ConnectionInfo *ConnectionInfo `json:",omitempty" bigquery:"-"` BBRInfo *BBRInfo `json:",omitempty"` TCPInfo *TCPInfo `json:",omitempty"` + WSPingInfo *WSPingInfo `json:",omitempty"` } diff --git a/ndt7/model/wspinginfo.go b/ndt7/model/wspinginfo.go new file mode 100644 index 00000000..d16ac226 --- /dev/null +++ b/ndt7/model/wspinginfo.go @@ -0,0 +1,9 @@ +package model + +// WSPingInfo contains an application level (websocket) ping measurement data. +// This structure is described in the ndt7 specification. +type WSPingInfo struct { + ElapsedTime int64 + LastRTT int64 // TCPInfo.RTT is smoothed RTT, LastRTT is just a sample. + MinRTT int64 +} diff --git a/ndt7/ping/message/message.go b/ndt7/ping/message/message.go new file mode 100644 index 00000000..d8cb5725 --- /dev/null +++ b/ndt7/ping/message/message.go @@ -0,0 +1,47 @@ +// Package message implements operations with WebSocket PING messages. +package message + +import ( + "encoding/json" + "errors" + "time" + + "github.com/gorilla/websocket" +) + +// The json object is used as a namespace to avoid erratic interpretation of +// unsolicited pong frames. Ping and pong frames are not a part of +// Sec-WebSocket-Protocol, they're part of RFC6455. Section 5.5.3 of the RFC +// allows unsolicited pong frames. Some browsers are known to send unsolicited +// pong frames, see golang/go#6377 . +type pingMessage struct { + Ndt7TS int64 +} + +// SendTicks sends the current ticks as a ping message. +func SendTicks(conn *websocket.Conn, start time.Time, deadline time.Time) error { + msg := pingMessage{ + Ndt7TS: time.Since(start).Nanoseconds(), + } + data, err := json.Marshal(msg) + if err == nil { + err = conn.WriteControl(websocket.PingMessage, data, deadline) + } + return err +} + +func ParseTicks(s string, start time.Time) (elapsed time.Duration, d time.Duration, err error) { + elapsed = time.Since(start) + var msg pingMessage + err = json.Unmarshal([]byte(s), &msg) + if err != nil { + return + } + prev := msg.Ndt7TS + if 0 <= prev && prev <= elapsed.Nanoseconds() { + d = time.Duration(elapsed.Nanoseconds() - prev) + } else { + err = errors.New("RTT is negative") + } + return +} diff --git a/ndt7/ping/mux/mux.go b/ndt7/ping/mux/mux.go new file mode 100644 index 00000000..2dd05e1e --- /dev/null +++ b/ndt7/ping/mux/mux.go @@ -0,0 +1,102 @@ +// Package mux implements the ping subtest channel multiplexor. +package mux + +import ( + "context" + + "github.com/m-lab/ndt-server/logging" + "github.com/m-lab/ndt-server/ndt7/model" + "github.com/m-lab/ndt-server/ndt7/ping/receiver" +) + +func upsert(self *model.Measurement, m model.Measurement) { + if m.AppInfo != nil { + self.AppInfo = m.AppInfo + } + if m.ConnectionInfo != nil { + self.ConnectionInfo = m.ConnectionInfo + } + if m.BBRInfo != nil { + self.BBRInfo = m.BBRInfo + } + if m.TCPInfo != nil { + self.TCPInfo = m.TCPInfo + } + if m.WSPingInfo != nil { + self.WSPingInfo = m.WSPingInfo + } +} + +func loop( + measurerch <-chan model.Measurement, receiverch <-chan receiver.Measurement, + senderch, serverlog, clientlog chan<- model.Measurement, + cancel context.CancelFunc, +) { + logging.Logger.Debug("mux: start") + defer logging.Logger.Debug("mux: stop") + + state := model.Measurement{} + for measurerch != nil || receiverch != nil { + select { + case m, ok := <-measurerch: + if ok { + serverlog <- m + upsert(&state, m) + if state.WSPingInfo != nil { + // Pong arrived, there is no in-flight ping. Perfect time for another sample. + senderch <- state + state = model.Measurement{} + } + } else { + logging.Logger.Debug("mux: measurerch closed") + measurerch = nil + // Propagate EOF to close websocket when measurer stops ticking. + close(senderch) + } + case m, ok := <-receiverch: + if ok { + if m.IsServerOrigin { // likely, pong frame + serverlog <- m.Measurement + // The sample is forwarded to the client with the next TCPInfo frame. + upsert(&state, m.Measurement) + } else { // json from the client + clientlog <- m.Measurement + } + } else { + logging.Logger.Debug("mux: receiverch closed") + receiverch = nil + // Stop measurer's ticker. Receiver has already finished its duties. + cancel() + } + } + } + close(serverlog) + close(clientlog) +} + +// MuxOutput is a return value of mux.Start to avoid confusion in argument ordering. +type MuxOutput struct { + SenderC <-chan model.Measurement + ServerLog <-chan model.Measurement + ClientLog <-chan model.Measurement +} + +// Start starts the channel multiplexor in a background goroutine. +// +// Liveness guarantees: +// 1) mux drains measurerch, otherwise measurer is deadlocked on send(); +// 2) mux drains receiverch, otherwise receiver MAY become deadlocked on send(clientMessage); +// 3) mux closes output channels when it's done; +// 4) EOF from receiverch is a signal to call |cancel| to terminate early. +func Start(measurerch <-chan model.Measurement, receiverch <-chan receiver.Measurement, cancel context.CancelFunc) MuxOutput { + senderch := make(chan model.Measurement) + serverlog := make(chan model.Measurement) + clientlog := make(chan model.Measurement) + go loop(measurerch, receiverch, senderch, serverlog, clientlog, cancel) + return MuxOutput{ + // ServerLog is not named "ServerC" to avoid visual confusion with "SenderC". + SenderC: senderch, + ServerLog: serverlog, + ClientLog: clientlog, + } +} diff --git a/ndt7/ping/ping.go b/ndt7/ping/ping.go index 2af7c7ff..eacbff2f 100644 --- a/ndt7/ping/ping.go +++ b/ndt7/ping/ping.go @@ -1,32 +1,32 @@ -// Package ping implements WebSocket PING messages. +// Package ping implements the ndt7 ping test. package ping import ( - "encoding/json" + "context" "time" "github.com/gorilla/websocket" + "github.com/m-lab/ndt-server/ndt7/measurer" + "github.com/m-lab/ndt-server/ndt7/ping/mux" + "github.com/m-lab/ndt-server/ndt7/ping/receiver" + "github.com/m-lab/ndt-server/ndt7/ping/sender" + "github.com/m-lab/ndt-server/ndt7/results" + "github.com/m-lab/ndt-server/ndt7/saver" + "github.com/m-lab/ndt-server/ndt7/spec" ) -// SendTicks sends the current ticks as a ping message. -func SendTicks(conn *websocket.Conn, deadline time.Time) error { - // TODO(bassosimone): when we'll have a unique base time.Time reference for - // the whole test, we should use that, since UnixNano() is not monotonic. - ticks := int64(time.Now().UnixNano()) - data, err := json.Marshal(ticks) - if err == nil { - err = conn.WriteControl(websocket.PingMessage, data, deadline) - } - return err -} - -func ParseTicks(s string) (d int64, err error) { - // TODO(bassosimone): when we'll have a unique base time.Time reference for - // the whole test, we should use that, since UnixNano() is not monotonic. - var prev int64 - err = json.Unmarshal([]byte(s), &prev) - if err == nil { - d = (int64(time.Now().UnixNano()) - prev) - } - return +// Do implements the ping subtest. The ctx argument is the parent +// context for the subtest. The conn argument is the open WebSocket +// connection. The resultfp argument is the file where to save results. Both +// arguments are owned by the caller of this function. The start argument is +// the test start time used to calculate ElapsedTime and deadlines. +func Do(ctx context.Context, conn *websocket.Conn, resultfp *results.File, start time.Time) { + wholectx, cancel := context.WithTimeout(ctx, spec.MaxRuntime) + // saver.SaveAll() blocks till channels are drained, so cancel() is just for consistency here. + defer cancel() + measurerch := measurer.Start(wholectx, conn, resultfp.Data.UUID, start) + receiverch := receiver.Start(wholectx, conn, start) + x := mux.Start(measurerch, receiverch, cancel) + sender.Start(conn, x.SenderC, start) + saver.SaveAll(resultfp, x.ServerLog, x.ClientLog) } diff --git a/ndt7/ping/receiver/receiver.go b/ndt7/ping/receiver/receiver.go new file mode 100644 index 00000000..f22668ed --- /dev/null +++ b/ndt7/ping/receiver/receiver.go @@ -0,0 +1,101 @@ +// Package mux implements the ping subtest receiver. +package receiver + +import ( + "context" + "encoding/json" + "math" + "time" + + "github.com/gorilla/websocket" + "github.com/m-lab/ndt-server/logging" + "github.com/m-lab/ndt-server/ndt7/model" + "github.com/m-lab/ndt-server/ndt7/ping/message" + "github.com/m-lab/ndt-server/ndt7/spec" +) + +type Measurement struct { + Measurement model.Measurement + IsServerOrigin bool +} + +const ( + maxDuration = math.MaxInt64 * time.Nanosecond +) + +func loop(ctx context.Context, conn *websocket.Conn, receiverch chan<- Measurement, start time.Time) { + logging.Logger.Debug("receiver: start") + defer logging.Logger.Debug("receiver: stop") + defer close(receiverch) + + conn.SetReadLimit(spec.MaxMessageSize) + + deadline, ok := ctx.Deadline() + if !ok { + panic("You passed me a context.Context without deadline") + } + + if err := conn.SetReadDeadline(deadline); err != nil { + logging.Logger.WithError(err).Warn("receiver: conn.SetReadDeadline failed") + return + } + + minRTT := maxDuration + + conn.SetPongHandler(func(s string) error { + elapsed, rtt, err := message.ParseTicks(s, start) + if err == nil { + if rtt < minRTT { + minRTT = rtt + } + m := Measurement{ + Measurement: model.Measurement{ + WSPingInfo: &model.WSPingInfo{ + ElapsedTime: int64(elapsed / time.Microsecond), + LastRTT: int64(rtt / time.Microsecond), + MinRTT: int64(minRTT / time.Microsecond), + }, + }, + IsServerOrigin: true, + } + receiverch <- m + } + return err + }) + + for ctx.Err() == nil { + mtype, mdata, err := conn.ReadMessage() + if err != nil { + if websocket.IsCloseError(err, websocket.CloseNormalClosure) { + return + } + logging.Logger.WithError(err).Warn("receiver: conn.ReadMessage failed") + return + } + if mtype != websocket.TextMessage { + logging.Logger.Warn("receiver: got non-Text message") + return // Unexpected message type + } + m := Measurement{ + IsServerOrigin: false, + } + err = json.Unmarshal(mdata, &m.Measurement) + if err != nil { + logging.Logger.WithError(err).Warn("receiver: json.Unmarshal failed") + return + } + receiverch <- m + } +} + +// Start starts the receiver in a background goroutine. The receiver processes pong frames +// and the measurement messages coming from conn. +// +// Liveness guarantees: +// 1) receiver uses ctx as the deadline for all conn operations and the goroutine itself, +// 2) receiver closes output channels when it's done. +func Start(ctx context.Context, conn *websocket.Conn, start time.Time) <-chan Measurement { + receiverch := make(chan Measurement) + go loop(ctx, conn, receiverch, start) + return receiverch +} diff --git a/ndt7/ping/sender/sender.go b/ndt7/ping/sender/sender.go new file mode 100644 index 00000000..2096e04f --- /dev/null +++ b/ndt7/ping/sender/sender.go @@ -0,0 +1,63 @@ +// Package sender implements the pingupload sender. +package sender + +import ( + "time" + + "github.com/gorilla/websocket" + "github.com/m-lab/ndt-server/logging" + "github.com/m-lab/ndt-server/ndt7/closer" + "github.com/m-lab/ndt-server/ndt7/model" + "github.com/m-lab/ndt-server/ndt7/ping/message" + "github.com/m-lab/ndt-server/ndt7/spec" +) + +func loop(conn *websocket.Conn, senderch <-chan model.Measurement, start time.Time) { + logging.Logger.Debug("sender: start") + defer logging.Logger.Debug("sender: stop") + + defer func() { + for range senderch { + // drain the channel (in case of error) + } + }() + + deadline := start.Add(spec.MaxRuntime) + + if err := conn.SetWriteDeadline(deadline); err != nil { + logging.Logger.WithError(err).Warn("sender: conn.SetWriteDeadline failed") + return + } + + // Initial ping is sent ASAP as all the following pings are sent if an only if pong frame + // has arrived. So, without this ping, no pong arrives and the test gets stuck till deadline. + if err := message.SendTicks(conn, start, deadline); err != nil { + logging.Logger.WithError(err).Warn("sender: ping.message.SendTicks failed") + return + } + + for m := range senderch { + if err := conn.WriteJSON(m); err != nil { + logging.Logger.WithError(err).Warn("sender: conn.WriteJSON failed") + return + } + if err := message.SendTicks(conn, start, deadline); err != nil { + logging.Logger.WithError(err).Warn("sender: ping.message.SendTicks failed") + return + } + } + + closer.StartClosing(conn) +} + +// Start starts the sender in a background goroutine. The sender will send +// to the client the measurement messages coming from senderch. Websocket ping +// frame will be sent right after the message. The sender does not signal errors, +// early cancellation in case of a network error is delegated to the receiver. +// +// Liveness guarantees: +// 1) sender keeps MaxRuntime as a timeout for conn operations; +// 2) sender drains the senderch, otherwise mux is deadlocked on send(TCPInfo). +func Start(conn *websocket.Conn, senderch <-chan model.Measurement, start time.Time) { + go loop(conn, senderch, start) +} diff --git a/ndt7/receiver/receiver.go b/ndt7/receiver/receiver.go index ab423ed2..d649ec00 100644 --- a/ndt7/receiver/receiver.go +++ b/ndt7/receiver/receiver.go @@ -10,7 +10,7 @@ import ( "github.com/gorilla/websocket" "github.com/m-lab/ndt-server/logging" "github.com/m-lab/ndt-server/ndt7/model" - "github.com/m-lab/ndt-server/ndt7/ping" + "github.com/m-lab/ndt-server/ndt7/ping/message" "github.com/m-lab/ndt-server/ndt7/spec" ) @@ -23,7 +23,7 @@ const ( func loop( ctx context.Context, conn *websocket.Conn, kind receiverKind, - dst chan<- model.Measurement, + dst chan<- model.Measurement, start time.Time, ) { logging.Logger.Debug("receiver: start") defer logging.Logger.Debug("receiver: stop") @@ -31,16 +31,17 @@ func loop( conn.SetReadLimit(spec.MaxMessageSize) receiverctx, cancel := context.WithTimeout(ctx, spec.MaxRuntime) defer cancel() - err := conn.SetReadDeadline(time.Now().Add(spec.MaxRuntime)) // Liveness! + err := conn.SetReadDeadline(start.Add(spec.MaxRuntime)) // Liveness! if err != nil { logging.Logger.WithError(err).Warn("receiver: conn.SetReadDeadline failed") return } conn.SetPongHandler(func(s string) error { - rtt, err := ping.ParseTicks(s) + _, rtt, err := message.ParseTicks(s, start) if err == nil { - rtt /= int64(time.Millisecond) - logging.Logger.Debugf("receiver: ApplicationLevel RTT: %d ms", rtt) + // Writing rtt to |dst| will write the Measurement to `ClientMeasurements` object. + // That goes against data format, so the value is just logged. + logging.Logger.Debugf("receiver: ApplicationLevel RTT: %d ms", int64(rtt / time.Millisecond)) } return err }) @@ -72,9 +73,9 @@ func loop( } } -func start(ctx context.Context, conn *websocket.Conn, kind receiverKind) <-chan model.Measurement { +func startReceiver(ctx context.Context, conn *websocket.Conn, kind receiverKind, start time.Time) (<-chan model.Measurement) { dst := make(chan model.Measurement) - go loop(ctx, conn, kind, dst) + go loop(ctx, conn, kind, dst, start) return dst } @@ -87,13 +88,13 @@ func start(ctx context.Context, conn *websocket.Conn, kind receiverKind) <-chan // Liveness guarantee: the goroutine will always terminate after a // MaxRuntime timeout, provided that the consumer will keep reading // from the returned channel. -func StartDownloadReceiver(ctx context.Context, conn *websocket.Conn) <-chan model.Measurement { - return start(ctx, conn, downloadReceiver) +func StartDownloadReceiver(ctx context.Context, conn *websocket.Conn, start time.Time) (<-chan model.Measurement) { + return startReceiver(ctx, conn, downloadReceiver, start) } // StartUploadReceiver is like StartDownloadReceiver except that it // tolerates incoming binary messages, which are sent to cause // network load, and therefore must not be rejected. -func StartUploadReceiver(ctx context.Context, conn *websocket.Conn) <-chan model.Measurement { - return start(ctx, conn, uploadReceiver) +func StartUploadReceiver(ctx context.Context, conn *websocket.Conn, start time.Time) (<-chan model.Measurement) { + return startReceiver(ctx, conn, uploadReceiver, start) } diff --git a/ndt7/results/file.go b/ndt7/results/file.go index 8786c429..016dc293 100644 --- a/ndt7/results/file.go +++ b/ndt7/results/file.go @@ -66,8 +66,8 @@ func newFile(datadir, what, uuid string) (*File, error) { // containing the metadata. The conn argument is used to retrieve the local and // the remote endpoints addresses. The "datadir" argument specifies the // directory on disk to write the data into and the what argument should -// indicate whether this is a spec.SubtestDownload or a spec.SubtestUpload -// ndt7 measurement. +// indicate whether this is a spec.SubtestDownload, a spec.SubtestUpload +// or a spec.SubtestPing ndt7 measurement. func OpenFor(request *http.Request, conn *websocket.Conn, datadir string, what spec.SubtestKind) (*File, error) { meta := make(metadata, 0) netConn := conn.UnderlyingConn() diff --git a/ndt7/spec/spec.go b/ndt7/spec/spec.go index 6d47d812..8d7c8522 100644 --- a/ndt7/spec/spec.go +++ b/ndt7/spec/spec.go @@ -9,6 +9,9 @@ const DownloadURLPath = "/ndt/v7/download" // UploadURLPath selects the upload subtest. const UploadURLPath = "/ndt/v7/upload" +// PingURLPath selects the ping subtest. +const PingURLPath = "/ndt/v7/ping" + // SecWebSocketProtocol is the WebSocket subprotocol used by ndt7. const SecWebSocketProtocol = "net.measurementlab.ndt.v7" @@ -57,4 +60,7 @@ const ( // SubtestUpload is a upload subtest SubtestUpload = SubtestKind("upload") + + // SubtestPing is a ping subtest + SubtestPing = SubtestKind("ping") ) diff --git a/ndt7/upload/sender/sender.go b/ndt7/upload/sender/sender.go index ff46a130..3b101208 100644 --- a/ndt7/upload/sender/sender.go +++ b/ndt7/upload/sender/sender.go @@ -8,13 +8,13 @@ import ( "github.com/m-lab/ndt-server/logging" "github.com/m-lab/ndt-server/ndt7/closer" "github.com/m-lab/ndt-server/ndt7/model" - "github.com/m-lab/ndt-server/ndt7/ping" + "github.com/m-lab/ndt-server/ndt7/ping/message" "github.com/m-lab/ndt-server/ndt7/spec" ) func loop( conn *websocket.Conn, src <-chan model.Measurement, - dst chan<- model.Measurement, + dst chan<- model.Measurement, start time.Time, ) { logging.Logger.Debug("sender: start") defer logging.Logger.Debug("sender: stop") @@ -24,12 +24,15 @@ func loop( // make sure we drain the channel } }() - deadline := time.Now().Add(spec.MaxRuntime) + deadline := start.Add(spec.MaxRuntime) err := conn.SetWriteDeadline(deadline) // Liveness! if err != nil { logging.Logger.WithError(err).Warn("sender: conn.SetWriteDeadline failed") return } + // There is no "special" first RTT sample for /upload. The server can't control if the client + // has already started flooding the connection. So, if server sends ping frame here, + // the pong frame may be still affected by HOL, like every other ping-pong frame. for { m, ok := <-src if !ok { // This means that the previous step has terminated @@ -41,8 +44,8 @@ func loop( return } dst <- m // Liveness: this is blocking - if err := ping.SendTicks(conn, deadline); err != nil { - logging.Logger.WithError(err).Warn("sender: ping.SendTicks failed") + if err := message.SendTicks(conn, start, deadline); err != nil { + logging.Logger.WithError(err).Warn("sender: ping.message.SendTicks failed") return } } @@ -55,9 +58,9 @@ func loop( // Liveness guarantee: the sender will not be stuck sending for more then // the MaxRuntime of the subtest, provided that the consumer will // continue reading from the returned channel. This is enforced by -// setting the write deadline to MaxRuntime + time.Now. -func Start(conn *websocket.Conn, src <-chan model.Measurement) <-chan model.Measurement { +// setting the write deadline to |start| + MaxRuntime. +func Start(conn *websocket.Conn, src <-chan model.Measurement, start time.Time) <-chan model.Measurement { dst := make(chan model.Measurement) - go loop(conn, src, dst) + go loop(conn, src, dst, start) return dst } diff --git a/ndt7/upload/upload.go b/ndt7/upload/upload.go index 511c3a70..e31dc83a 100644 --- a/ndt7/upload/upload.go +++ b/ndt7/upload/upload.go @@ -3,6 +3,7 @@ package upload import ( "context" + "time" "github.com/gorilla/websocket" "github.com/m-lab/ndt-server/ndt7/results" @@ -15,13 +16,14 @@ import ( // Do implements the upload subtest. The ctx argument is the parent context // for the subtest. The conn argument is the open WebSocket connection. The // resultfp argument is the file where to save results. Both arguments are -// owned by the caller of this function. -func Do(ctx context.Context, conn *websocket.Conn, resultfp *results.File) { +// owned by the caller of this function. The start argument is the test +// start time used to calculate ElapsedTime and deadlines. +func Do(ctx context.Context, conn *websocket.Conn, resultfp *results.File, start time.Time) { // Implementation note: use child context so that, if we cannot save the // results in the loop below, we terminate the goroutines early wholectx, cancel := context.WithCancel(ctx) defer cancel() - senderch := sender.Start(conn, measurer.Start(wholectx, conn, resultfp.Data.UUID)) - receiverch := receiver.StartUploadReceiver(wholectx, conn) + senderch := sender.Start(conn, measurer.Start(wholectx, conn, resultfp.Data.UUID, start), start) + receiverch := receiver.StartUploadReceiver(wholectx, conn, start) saver.SaveAll(resultfp, senderch, receiverch) } diff --git a/spec/ndt7-protocol.md b/spec/ndt7-protocol.md index 663bfa31..827d0f42 100644 --- a/spec/ndt7-protocol.md +++ b/spec/ndt7-protocol.md @@ -7,18 +7,19 @@ protocol](https://github.com/ndt-project/ndt). Ndt7 is based on WebSocket and TLS, and takes advantage of TCP BBR, where this flavour of TCP is available. -This is version v0.8.3 of the ndt7 specification. +This is version v0.9.0 of the ndt7 specification. ## Design choices (This section is non-normative.) Ndt7 measures the application-level download and upload performance -using WebSockets over TLS. Each test type is independent, and -there are two types of test: the download and the upload tests. Ndt7 -always uses a single TCP connection. Whenever possible, ndt7 uses a recent -version of TCP BBR. Writing an ndt7 client is designed to be as simple -as possible. [A complete Go language ndt7 client]( +using WebSockets over TLS. Each test type is independent, and there are +three types of test: the download, the upload tests, and the latency +test. Ndt7 always uses a single new TCP connection for each type of +test. Whenever possible, ndt7 uses a recent version of TCP BBR. Writing +an ndt7 client is designed to be as simple as possible. [A complete Go +language ndt7 client]( https://github.com/bassosimone/ndt7-client-go-minimal) has been implemented in just 151 lines. We used 26 lines for the download, 33 for the upload, and 17 for establishing a connections. No code from the NDT server has been @@ -68,12 +69,14 @@ servers should behave during the download and the upload tests. The client connects to the server using HTTPS and requests to upgrade the connection to WebSockets. The same connection will be used to exchange control and measurement messages. The upgrade request URL will indicate -the type of test that the client wants to perform. Two tests and -hence two URLs are defined: +the type of test that the client wants to perform. Three tests and +hence three URLs are defined: - `/ndt/v7/download`, which selects the download test; -- `/ndt/v7/upload`, which selects the upload test. +- `/ndt/v7/upload`, which selects the upload test; + +- `/ndt/v7/ping`, which selects the ping test. The upgrade message MUST also contain the WebSocket subprotocol that identifies ndt7, which is `net.measurementlab.ndt.v7`. The URL in the @@ -199,14 +202,14 @@ provide information useful to diagnose performance issues. While in theory we could specify all `TCP_INFO` and `BBR_INFO` variables, different kernel versions provide different subsets of these measurements and we do not want to be needlessly restrictive regarding the underlying -kernel for the server. Instead, +kernel for the server. Instead, our guiding principle is to describe only the variables that in our experience are useful to understand performance issues. More variables could be added in the future. No variables should be removed, but, if some are removed, we should document them as being removed rather than removing them from this specification. -Since version v0.8.0 of this specification, the measurement message +Since version v0.9.0 of this specification, the measurement message has the following structure: ```json @@ -222,6 +225,11 @@ has the following structure: }, "Origin": "server", "Test": "download", + "WSPingInfo": { + "ElapsedTime": 1234, + "LastRTT": 134, + "MinRTT": 1234 + }, "TCPInfo": { "BusyTime": 1234, "BytesAcked": 1234, @@ -281,6 +289,18 @@ Where: current test. This field SHOULD only be used when the current test should otherwise not be obvious. +- `WSPingInfo` is an _optional_ `object` only included in the measurement + when a reasonable websocket-level measurement is available: + + - `ElapsedTime` (a `int64`) is the pong frame arrival time elapsed + since the beginning of this test, measured in microseconds. + + - `LastRTT` (an _optional_ `int64`), the last observed RTT for the websocket + ping-pong exchange, measured in microseconds. + + - `MinRTT` (an _optional_ `int64`), the minimum observed RTT for the websocket + ping-pong exchange, measured in microseconds. + - `TCPInfo` is an _optional_ `object` only included in the measurement when it is possible to access `TCP_INFO` stats. It contains: @@ -387,9 +407,11 @@ When the server sends measurement messages, the download becomes: ``` > GET /ndt/v7/download Upgrade: websocket < 101 Switching Protocols +< PingMessage < BinaryMessage < BinaryMessage < TextMessage clientElapsedTime=0.30 s +> PongMessage < BinaryMessage < BinaryMessage < TextMessage clientElapsedTime=0.55 s @@ -645,7 +667,9 @@ of the round-trip time. The buildup of a large queue is unexpected when using BBR. It generally indicates the presence of a bottleneck with a large buffer that's filling as the test proceeds. The `MinRTT` can also be useful to verify we're using a reasonably nearby-server. Also, an unreasonably small RTT when -the link is 2G or 3G could indicate a performance enhancing proxy. +the link is 2G or 3G could indicate a performance enhancing proxy, one can +compare `TCPInfo.MinRTT` against `WSPingInfo.MinRTT` to get additional evidence +supporing this case. The times (`BusyTime`, `RWndLimited`, and `SndBufLimited`) are useful to understand where the bottleneck could be. In general we would like to see @@ -679,3 +703,18 @@ packet is uniformly distributed, which isn't likely the case. Yet, it may be an useful first order information to characterise a network as possibly very lossy. Some packet loss is normal and healthy, but too much packet loss is the sign of a network path with systemic problems. + +### Measuring latency + +The presence of TCP-level proxies leads to L7 means being needed to +measure end-to-end latency in addition to end-to-end bandwidth. Such +proxies may include ISP-level performance-enhancing proxies, OpenSSH, +Tor anonymity network and many others. + +`WSPingInfo.LastRTT` samples may be affected by the payload during download +and upload tests, as the queue of BinaryMessage may delay either ping +or pong frame. Ping test does not send BinaryMessage payload, so WSPingInfo +RTT measurements should be reasonably accurate (unless it's practical +for the client to delay pong frames). The very first `WSPingInfo` sample +collected during the download test also has a chance to be accurate as +the ping frame SHOULD precede any BinaryMessages in the case.