From 2c32a60105cfd02f5e3008c69828e42c276a0353 Mon Sep 17 00:00:00 2001 From: Leonid Evdokimov Date: Mon, 10 Feb 2020 12:26:22 +0000 Subject: [PATCH] ndt7: start improving ping implementation This diff has been written by @darkk as part of #234 and has been extracted #234 by @bassosimone, with the objective of merging these low hanging fruits while we look into the proper way of implementing the `/ping` endpoint. Closes #242 --- ndt7/download/download.go | 10 +++++--- ndt7/download/sender/sender.go | 22 ++++++++++------ ndt7/handler/handler.go | 7 ++--- ndt7/measurer/measurer.go | 7 +++-- ndt7/ping/message/message.go | 47 ++++++++++++++++++++++++++++++++++ ndt7/ping/ping.go | 32 ----------------------- ndt7/receiver/receiver.go | 25 +++++++++--------- ndt7/upload/sender/sender.go | 19 ++++++++------ ndt7/upload/upload.go | 10 +++++--- 9 files changed, 104 insertions(+), 75 deletions(-) create mode 100644 ndt7/ping/message/message.go delete mode 100644 ndt7/ping/ping.go diff --git a/ndt7/download/download.go b/ndt7/download/download.go index 832d7a21..510de9e0 100644 --- a/ndt7/download/download.go +++ b/ndt7/download/download.go @@ -3,6 +3,7 @@ package download import ( "context" + "time" "github.com/gorilla/websocket" "github.com/m-lab/ndt-server/ndt7/download/sender" @@ -15,13 +16,14 @@ import ( // Do implements the download subtest. The ctx argument is the parent // context for the subtest. The conn argument is the open WebSocket // connection. The resultfp argument is the file where to save results. Both -// arguments are owned by the caller of this function. -func Do(ctx context.Context, conn *websocket.Conn, resultfp *results.File) { +// arguments are owned by the caller of this function. The start argument is +// the test start time used to calculate ElapsedTime and deadlines. +func Do(ctx context.Context, conn *websocket.Conn, resultfp *results.File, start time.Time) { // Implementation note: use child context so that, if we cannot save the // results in the loop below, we terminate the goroutines early wholectx, cancel := context.WithCancel(ctx) defer cancel() - senderch := sender.Start(conn, measurer.Start(wholectx, conn, resultfp.Data.UUID)) - receiverch := receiver.StartDownloadReceiver(wholectx, conn) + senderch := sender.Start(conn, measurer.Start(wholectx, conn, resultfp.Data.UUID, start), start) + receiverch := receiver.StartDownloadReceiver(wholectx, conn, start) saver.SaveAll(resultfp, senderch, receiverch) } diff --git a/ndt7/download/sender/sender.go b/ndt7/download/sender/sender.go index 5c4efc03..9f9f86cc 100644 --- a/ndt7/download/sender/sender.go +++ b/ndt7/download/sender/sender.go @@ -9,7 +9,7 @@ import ( "github.com/m-lab/ndt-server/logging" "github.com/m-lab/ndt-server/ndt7/closer" "github.com/m-lab/ndt-server/ndt7/model" - "github.com/m-lab/ndt-server/ndt7/ping" + "github.com/m-lab/ndt-server/ndt7/ping/message" "github.com/m-lab/ndt-server/ndt7/spec" ) @@ -22,7 +22,7 @@ func makePreparedMessage(size int) (*websocket.PreparedMessage, error) { return websocket.NewPreparedMessage(websocket.BinaryMessage, data) } -func loop(conn *websocket.Conn, src <-chan model.Measurement, dst chan<- model.Measurement) { +func loop(conn *websocket.Conn, src <-chan model.Measurement, dst chan<- model.Measurement, start time.Time) { logging.Logger.Debug("sender: start") defer logging.Logger.Debug("sender: stop") defer close(dst) @@ -38,12 +38,18 @@ func loop(conn *websocket.Conn, src <-chan model.Measurement, dst chan<- model.M logging.Logger.WithError(err).Warn("sender: makePreparedMessage failed") return } - deadline := time.Now().Add(spec.MaxRuntime) + deadline := start.Add(spec.MaxRuntime) err = conn.SetWriteDeadline(deadline) // Liveness! if err != nil { logging.Logger.WithError(err).Warn("sender: conn.SetWriteDeadline failed") return } + // One RTT sample is taken before flooding the connection with data. + // That sample is not affected by HOL, so it has additional value and is treated specially. + if err := message.SendTicks(conn, start, deadline); err != nil { + logging.Logger.WithError(err).Warn("sender: ping.message.SendTicks failed") + return + } var totalSent int64 for { select { @@ -57,8 +63,8 @@ func loop(conn *websocket.Conn, src <-chan model.Measurement, dst chan<- model.M return } dst <- m // Liveness: this is blocking - if err := ping.SendTicks(conn, deadline); err != nil { - logging.Logger.WithError(err).Warn("sender: ping.SendTicks failed") + if err := message.SendTicks(conn, start, deadline); err != nil { + logging.Logger.WithError(err).Warn("sender: ping.message.SendTicks failed") return } default: @@ -99,9 +105,9 @@ func loop(conn *websocket.Conn, src <-chan model.Measurement, dst chan<- model.M // Liveness guarantee: the sender will not be stuck sending for more then // the MaxRuntime of the subtest, provided that the consumer will // continue reading from the returned channel. This is enforced by -// setting the write deadline to Time.Now() + MaxRuntime. -func Start(conn *websocket.Conn, src <-chan model.Measurement) <-chan model.Measurement { +// setting the write deadline to |start| + MaxRuntime. +func Start(conn *websocket.Conn, src <-chan model.Measurement, start time.Time) <-chan model.Measurement { dst := make(chan model.Measurement) - go loop(conn, src, dst) + go loop(conn, src, dst, start) return dst } diff --git a/ndt7/handler/handler.go b/ndt7/handler/handler.go index 0d7a522f..ea624885 100644 --- a/ndt7/handler/handler.go +++ b/ndt7/handler/handler.go @@ -36,8 +36,9 @@ func warnAndClose(writer http.ResponseWriter, message string) { // testerFunc is the function implementing a subtest. The first argument // is the subtest context. The second argument is the connected websocket. The // third argument is the open file where to write results. This function does -// not own the second or the third argument. -type testerFunc = func(context.Context, *websocket.Conn, *results.File) +// not own the second or the third argument. The fourth argument is the base +// start time of the test. +type testerFunc = func(context.Context, *websocket.Conn, *results.File, time.Time) // downloadOrUpload implements both download and upload. The writer argument // is the HTTP response writer. The request argument is the HTTP request @@ -114,7 +115,7 @@ func (h Handler) downloadOrUpload(writer http.ResponseWriter, request *http.Requ } warnonerror.Close(resultfp, string(kind)+": ignoring resultfp.Close error") }() - tester(request.Context(), conn, resultfp) + tester(request.Context(), conn, resultfp, result.StartTime) } // Download handles the download subtest. diff --git a/ndt7/measurer/measurer.go b/ndt7/measurer/measurer.go index da1afa64..fc4c3f2b 100644 --- a/ndt7/measurer/measurer.go +++ b/ndt7/measurer/measurer.go @@ -54,7 +54,7 @@ func measure(measurement *model.Measurement, sockfp *os.File, elapsed time.Durat } } -func loop(ctx context.Context, conn *websocket.Conn, UUID string, dst chan<- model.Measurement) { +func loop(ctx context.Context, conn *websocket.Conn, UUID string, dst chan<- model.Measurement, start time.Time) { logging.Logger.Debug("measurer: start") defer logging.Logger.Debug("measurer: stop") defer close(dst) @@ -66,7 +66,6 @@ func loop(ctx context.Context, conn *websocket.Conn, UUID string, dst chan<- mod return } defer sockfp.Close() - start := time.Now() connectionInfo := &model.ConnectionInfo{ Client: conn.RemoteAddr().String(), Server: conn.LocalAddr().String(), @@ -104,9 +103,9 @@ func loop(ctx context.Context, conn *websocket.Conn, UUID string, dst chan<- mod // a timeout of DefaultRuntime seconds, provided that the consumer // continues reading from the returned channel. func Start( - ctx context.Context, conn *websocket.Conn, UUID string, + ctx context.Context, conn *websocket.Conn, UUID string, start time.Time, ) <-chan model.Measurement { dst := make(chan model.Measurement) - go loop(ctx, conn, UUID, dst) + go loop(ctx, conn, UUID, dst, start) return dst } diff --git a/ndt7/ping/message/message.go b/ndt7/ping/message/message.go new file mode 100644 index 00000000..d8cb5725 --- /dev/null +++ b/ndt7/ping/message/message.go @@ -0,0 +1,47 @@ +// Package message implements operations with WebSocket PING messages. +package message + +import ( + "encoding/json" + "errors" + "time" + + "github.com/gorilla/websocket" +) + +// The json object is used as a namespace to avoid erratic interpretation of +// unsolicited pong frames. Ping and pong frames are not a part of +// Sec-WebSocket-Protocol, they're part of RFC6455. Section 5.5.3 of the RFC +// allows unsolicited pong frames. Some browsers are known to send unsolicited +// pong frames, see golang/go#6377 . +type pingMessage struct { + Ndt7TS int64 +} + +// SendTicks sends the current ticks as a ping message. +func SendTicks(conn *websocket.Conn, start time.Time, deadline time.Time) error { + msg := pingMessage{ + Ndt7TS: time.Since(start).Nanoseconds(), + } + data, err := json.Marshal(msg) + if err == nil { + err = conn.WriteControl(websocket.PingMessage, data, deadline) + } + return err +} + +func ParseTicks(s string, start time.Time) (elapsed time.Duration, d time.Duration, err error) { + elapsed = time.Since(start) + var msg pingMessage + err = json.Unmarshal([]byte(s), &msg) + if err != nil { + return + } + prev := msg.Ndt7TS + if 0 <= prev && prev <= elapsed.Nanoseconds() { + d = time.Duration(elapsed.Nanoseconds() - prev) + } else { + err = errors.New("RTT is negative") + } + return +} diff --git a/ndt7/ping/ping.go b/ndt7/ping/ping.go deleted file mode 100644 index 2af7c7ff..00000000 --- a/ndt7/ping/ping.go +++ /dev/null @@ -1,32 +0,0 @@ -// Package ping implements WebSocket PING messages. -package ping - -import ( - "encoding/json" - "time" - - "github.com/gorilla/websocket" -) - -// SendTicks sends the current ticks as a ping message. -func SendTicks(conn *websocket.Conn, deadline time.Time) error { - // TODO(bassosimone): when we'll have a unique base time.Time reference for - // the whole test, we should use that, since UnixNano() is not monotonic. - ticks := int64(time.Now().UnixNano()) - data, err := json.Marshal(ticks) - if err == nil { - err = conn.WriteControl(websocket.PingMessage, data, deadline) - } - return err -} - -func ParseTicks(s string) (d int64, err error) { - // TODO(bassosimone): when we'll have a unique base time.Time reference for - // the whole test, we should use that, since UnixNano() is not monotonic. - var prev int64 - err = json.Unmarshal([]byte(s), &prev) - if err == nil { - d = (int64(time.Now().UnixNano()) - prev) - } - return -} diff --git a/ndt7/receiver/receiver.go b/ndt7/receiver/receiver.go index ab423ed2..d649ec00 100644 --- a/ndt7/receiver/receiver.go +++ b/ndt7/receiver/receiver.go @@ -10,7 +10,7 @@ import ( "github.com/gorilla/websocket" "github.com/m-lab/ndt-server/logging" "github.com/m-lab/ndt-server/ndt7/model" - "github.com/m-lab/ndt-server/ndt7/ping" + "github.com/m-lab/ndt-server/ndt7/ping/message" "github.com/m-lab/ndt-server/ndt7/spec" ) @@ -23,7 +23,7 @@ const ( func loop( ctx context.Context, conn *websocket.Conn, kind receiverKind, - dst chan<- model.Measurement, + dst chan<- model.Measurement, start time.Time, ) { logging.Logger.Debug("receiver: start") defer logging.Logger.Debug("receiver: stop") @@ -31,16 +31,17 @@ func loop( conn.SetReadLimit(spec.MaxMessageSize) receiverctx, cancel := context.WithTimeout(ctx, spec.MaxRuntime) defer cancel() - err := conn.SetReadDeadline(time.Now().Add(spec.MaxRuntime)) // Liveness! + err := conn.SetReadDeadline(start.Add(spec.MaxRuntime)) // Liveness! if err != nil { logging.Logger.WithError(err).Warn("receiver: conn.SetReadDeadline failed") return } conn.SetPongHandler(func(s string) error { - rtt, err := ping.ParseTicks(s) + _, rtt, err := message.ParseTicks(s, start) if err == nil { - rtt /= int64(time.Millisecond) - logging.Logger.Debugf("receiver: ApplicationLevel RTT: %d ms", rtt) + // Writing rtt to |dst| will write the Measurement to `ClientMeasurements` object. + // That goes against data format, so the value is just logged. + logging.Logger.Debugf("receiver: ApplicationLevel RTT: %d ms", int64(rtt / time.Millisecond)) } return err }) @@ -72,9 +73,9 @@ func loop( } } -func start(ctx context.Context, conn *websocket.Conn, kind receiverKind) <-chan model.Measurement { +func startReceiver(ctx context.Context, conn *websocket.Conn, kind receiverKind, start time.Time) (<-chan model.Measurement) { dst := make(chan model.Measurement) - go loop(ctx, conn, kind, dst) + go loop(ctx, conn, kind, dst, start) return dst } @@ -87,13 +88,13 @@ func start(ctx context.Context, conn *websocket.Conn, kind receiverKind) <-chan // Liveness guarantee: the goroutine will always terminate after a // MaxRuntime timeout, provided that the consumer will keep reading // from the returned channel. -func StartDownloadReceiver(ctx context.Context, conn *websocket.Conn) <-chan model.Measurement { - return start(ctx, conn, downloadReceiver) +func StartDownloadReceiver(ctx context.Context, conn *websocket.Conn, start time.Time) (<-chan model.Measurement) { + return startReceiver(ctx, conn, downloadReceiver, start) } // StartUploadReceiver is like StartDownloadReceiver except that it // tolerates incoming binary messages, which are sent to cause // network load, and therefore must not be rejected. -func StartUploadReceiver(ctx context.Context, conn *websocket.Conn) <-chan model.Measurement { - return start(ctx, conn, uploadReceiver) +func StartUploadReceiver(ctx context.Context, conn *websocket.Conn, start time.Time) (<-chan model.Measurement) { + return startReceiver(ctx, conn, uploadReceiver, start) } diff --git a/ndt7/upload/sender/sender.go b/ndt7/upload/sender/sender.go index ff46a130..3b101208 100644 --- a/ndt7/upload/sender/sender.go +++ b/ndt7/upload/sender/sender.go @@ -8,13 +8,13 @@ import ( "github.com/m-lab/ndt-server/logging" "github.com/m-lab/ndt-server/ndt7/closer" "github.com/m-lab/ndt-server/ndt7/model" - "github.com/m-lab/ndt-server/ndt7/ping" + "github.com/m-lab/ndt-server/ndt7/ping/message" "github.com/m-lab/ndt-server/ndt7/spec" ) func loop( conn *websocket.Conn, src <-chan model.Measurement, - dst chan<- model.Measurement, + dst chan<- model.Measurement, start time.Time, ) { logging.Logger.Debug("sender: start") defer logging.Logger.Debug("sender: stop") @@ -24,12 +24,15 @@ func loop( // make sure we drain the channel } }() - deadline := time.Now().Add(spec.MaxRuntime) + deadline := start.Add(spec.MaxRuntime) err := conn.SetWriteDeadline(deadline) // Liveness! if err != nil { logging.Logger.WithError(err).Warn("sender: conn.SetWriteDeadline failed") return } + // There is no "special" first RTT sample for /upload. The server can't control if the client + // has already started flooding the connection. So, if server sends ping frame here, + // the pong frame may be still affected by HOL, like every other ping-pong frame. for { m, ok := <-src if !ok { // This means that the previous step has terminated @@ -41,8 +44,8 @@ func loop( return } dst <- m // Liveness: this is blocking - if err := ping.SendTicks(conn, deadline); err != nil { - logging.Logger.WithError(err).Warn("sender: ping.SendTicks failed") + if err := message.SendTicks(conn, start, deadline); err != nil { + logging.Logger.WithError(err).Warn("sender: ping.message.SendTicks failed") return } } @@ -55,9 +58,9 @@ func loop( // Liveness guarantee: the sender will not be stuck sending for more then // the MaxRuntime of the subtest, provided that the consumer will // continue reading from the returned channel. This is enforced by -// setting the write deadline to MaxRuntime + time.Now. -func Start(conn *websocket.Conn, src <-chan model.Measurement) <-chan model.Measurement { +// setting the write deadline to |start| + MaxRuntime. +func Start(conn *websocket.Conn, src <-chan model.Measurement, start time.Time) <-chan model.Measurement { dst := make(chan model.Measurement) - go loop(conn, src, dst) + go loop(conn, src, dst, start) return dst } diff --git a/ndt7/upload/upload.go b/ndt7/upload/upload.go index 511c3a70..e31dc83a 100644 --- a/ndt7/upload/upload.go +++ b/ndt7/upload/upload.go @@ -3,6 +3,7 @@ package upload import ( "context" + "time" "github.com/gorilla/websocket" "github.com/m-lab/ndt-server/ndt7/results" @@ -15,13 +16,14 @@ import ( // Do implements the upload subtest. The ctx argument is the parent context // for the subtest. The conn argument is the open WebSocket connection. The // resultfp argument is the file where to save results. Both arguments are -// owned by the caller of this function. -func Do(ctx context.Context, conn *websocket.Conn, resultfp *results.File) { +// owned by the caller of this function. The start argument is the test +// start time used to calculate ElapsedTime and deadlines. +func Do(ctx context.Context, conn *websocket.Conn, resultfp *results.File, start time.Time) { // Implementation note: use child context so that, if we cannot save the // results in the loop below, we terminate the goroutines early wholectx, cancel := context.WithCancel(ctx) defer cancel() - senderch := sender.Start(conn, measurer.Start(wholectx, conn, resultfp.Data.UUID)) - receiverch := receiver.StartUploadReceiver(wholectx, conn) + senderch := sender.Start(conn, measurer.Start(wholectx, conn, resultfp.Data.UUID, start), start) + receiverch := receiver.StartUploadReceiver(wholectx, conn, start) saver.SaveAll(resultfp, senderch, receiverch) }