diff --git a/ndt7/download/download.go b/ndt7/download/download.go index c67aee3e..0006c7db 100644 --- a/ndt7/download/download.go +++ b/ndt7/download/download.go @@ -3,6 +3,7 @@ package download import ( "context" + "time" "github.com/gorilla/websocket" "github.com/m-lab/ndt-server/ndt7/download/sender" @@ -15,14 +16,15 @@ import ( // Do implements the download subtest. The ctx argument is the parent // context for the subtest. The conn argument is the open WebSocket // connection. The resultfp argument is the file where to save results. Both -// arguments are owned by the caller of this function. -func Do(ctx context.Context, conn *websocket.Conn, resultfp *results.File) { +// arguments are owned by the caller of this function. The start argument is +// the test start time used to calculate ElapsedTime and deadlines. +func Do(ctx context.Context, conn *websocket.Conn, resultfp *results.File, start time.Time) { // Implementation note: use child context so that, if we cannot save the // results in the loop below, we terminate the goroutines early - wholectx, cancel := context.WithCancel(ctx) + ctx, cancel := context.WithCancel(ctx) defer cancel() - measurer := measurer.New(conn, resultfp.Data.UUID) - senderch := sender.Start(conn, measurer.Start(ctx)) - receiverch := receiver.StartDownloadReceiver(wholectx, conn) + measurer := measurer.New(conn, resultfp.Data.UUID, start) + senderch := sender.Start(conn, measurer.Start(ctx), start) + receiverch := receiver.StartDownloadReceiver(ctx, conn, start) saver.SaveAll(resultfp, senderch, receiverch) } diff --git a/ndt7/download/sender/sender.go b/ndt7/download/sender/sender.go index 5c4efc03..9f9f86cc 100644 --- a/ndt7/download/sender/sender.go +++ b/ndt7/download/sender/sender.go @@ -9,7 +9,7 @@ import ( "github.com/m-lab/ndt-server/logging" "github.com/m-lab/ndt-server/ndt7/closer" "github.com/m-lab/ndt-server/ndt7/model" - "github.com/m-lab/ndt-server/ndt7/ping" + "github.com/m-lab/ndt-server/ndt7/ping/message" "github.com/m-lab/ndt-server/ndt7/spec" ) @@ -22,7 +22,7 @@ func makePreparedMessage(size int) (*websocket.PreparedMessage, error) { return websocket.NewPreparedMessage(websocket.BinaryMessage, data) } -func loop(conn *websocket.Conn, src <-chan model.Measurement, dst chan<- model.Measurement) { +func loop(conn *websocket.Conn, src <-chan model.Measurement, dst chan<- model.Measurement, start time.Time) { logging.Logger.Debug("sender: start") defer logging.Logger.Debug("sender: stop") defer close(dst) @@ -38,12 +38,18 @@ func loop(conn *websocket.Conn, src <-chan model.Measurement, dst chan<- model.M logging.Logger.WithError(err).Warn("sender: makePreparedMessage failed") return } - deadline := time.Now().Add(spec.MaxRuntime) + deadline := start.Add(spec.MaxRuntime) err = conn.SetWriteDeadline(deadline) // Liveness! if err != nil { logging.Logger.WithError(err).Warn("sender: conn.SetWriteDeadline failed") return } + // One RTT sample is taken before flooding the connection with data. + // That sample is not affected by HOL, so it has additional value and is treated specially. + if err := message.SendTicks(conn, start, deadline); err != nil { + logging.Logger.WithError(err).Warn("sender: ping.message.SendTicks failed") + return + } var totalSent int64 for { select { @@ -57,8 +63,8 @@ func loop(conn *websocket.Conn, src <-chan model.Measurement, dst chan<- model.M return } dst <- m // Liveness: this is blocking - if err := ping.SendTicks(conn, deadline); err != nil { - logging.Logger.WithError(err).Warn("sender: ping.SendTicks failed") + if err := message.SendTicks(conn, start, deadline); err != nil { + logging.Logger.WithError(err).Warn("sender: ping.message.SendTicks failed") return } default: @@ -99,9 +105,9 @@ func loop(conn *websocket.Conn, src <-chan model.Measurement, dst chan<- model.M // Liveness guarantee: the sender will not be stuck sending for more then // the MaxRuntime of the subtest, provided that the consumer will // continue reading from the returned channel. This is enforced by -// setting the write deadline to Time.Now() + MaxRuntime. -func Start(conn *websocket.Conn, src <-chan model.Measurement) <-chan model.Measurement { +// setting the write deadline to |start| + MaxRuntime. +func Start(conn *websocket.Conn, src <-chan model.Measurement, start time.Time) <-chan model.Measurement { dst := make(chan model.Measurement) - go loop(conn, src, dst) + go loop(conn, src, dst, start) return dst } diff --git a/ndt7/handler/handler.go b/ndt7/handler/handler.go index 0d7a522f..ea624885 100644 --- a/ndt7/handler/handler.go +++ b/ndt7/handler/handler.go @@ -36,8 +36,9 @@ func warnAndClose(writer http.ResponseWriter, message string) { // testerFunc is the function implementing a subtest. The first argument // is the subtest context. The second argument is the connected websocket. The // third argument is the open file where to write results. This function does -// not own the second or the third argument. -type testerFunc = func(context.Context, *websocket.Conn, *results.File) +// not own the second or the third argument. The fourth argument is the base +// start time of the test. +type testerFunc = func(context.Context, *websocket.Conn, *results.File, time.Time) // downloadOrUpload implements both download and upload. The writer argument // is the HTTP response writer. The request argument is the HTTP request @@ -114,7 +115,7 @@ func (h Handler) downloadOrUpload(writer http.ResponseWriter, request *http.Requ } warnonerror.Close(resultfp, string(kind)+": ignoring resultfp.Close error") }() - tester(request.Context(), conn, resultfp) + tester(request.Context(), conn, resultfp, result.StartTime) } // Download handles the download subtest. diff --git a/ndt7/measurer/measurer.go b/ndt7/measurer/measurer.go index 9fd8f4ec..b67a2cea 100644 --- a/ndt7/measurer/measurer.go +++ b/ndt7/measurer/measurer.go @@ -20,15 +20,17 @@ import ( // Measurer performs measurements type Measurer struct { - conn *websocket.Conn - uuid string + conn *websocket.Conn + start time.Time + uuid string } // New creates a new measurer instance -func New(conn *websocket.Conn, UUID string) *Measurer { +func New(conn *websocket.Conn, UUID string, start time.Time) *Measurer { return &Measurer{ - conn: conn, - uuid: UUID, + conn: conn, + start: start, + uuid: UUID, } } @@ -80,7 +82,6 @@ func (m *Measurer) loop(ctx context.Context, dst chan<- model.Measurement) { return } defer sockfp.Close() - start := time.Now() connectionInfo := &model.ConnectionInfo{ Client: m.conn.RemoteAddr().String(), Server: m.conn.LocalAddr().String(), @@ -104,7 +105,7 @@ func (m *Measurer) loop(ctx context.Context, dst chan<- model.Measurement) { return } var measurement model.Measurement - measure(&measurement, sockfp, now.Sub(start)) + measure(&measurement, sockfp, now.Sub(m.start)) measurement.ConnectionInfo = connectionInfo dst <- measurement // Liveness: this is blocking } diff --git a/ndt7/ping/message/message.go b/ndt7/ping/message/message.go new file mode 100644 index 00000000..d8cb5725 --- /dev/null +++ b/ndt7/ping/message/message.go @@ -0,0 +1,47 @@ +// Package message implements operations with WebSocket PING messages. +package message + +import ( + "encoding/json" + "errors" + "time" + + "github.com/gorilla/websocket" +) + +// The json object is used as a namespace to avoid erratic interpretation of +// unsolicited pong frames. Ping and pong frames are not a part of +// Sec-WebSocket-Protocol, they're part of RFC6455. Section 5.5.3 of the RFC +// allows unsolicited pong frames. Some browsers are known to send unsolicited +// pong frames, see golang/go#6377 . +type pingMessage struct { + Ndt7TS int64 +} + +// SendTicks sends the current ticks as a ping message. +func SendTicks(conn *websocket.Conn, start time.Time, deadline time.Time) error { + msg := pingMessage{ + Ndt7TS: time.Since(start).Nanoseconds(), + } + data, err := json.Marshal(msg) + if err == nil { + err = conn.WriteControl(websocket.PingMessage, data, deadline) + } + return err +} + +func ParseTicks(s string, start time.Time) (elapsed time.Duration, d time.Duration, err error) { + elapsed = time.Since(start) + var msg pingMessage + err = json.Unmarshal([]byte(s), &msg) + if err != nil { + return + } + prev := msg.Ndt7TS + if 0 <= prev && prev <= elapsed.Nanoseconds() { + d = time.Duration(elapsed.Nanoseconds() - prev) + } else { + err = errors.New("RTT is negative") + } + return +} diff --git a/ndt7/ping/ping.go b/ndt7/ping/ping.go deleted file mode 100644 index 2af7c7ff..00000000 --- a/ndt7/ping/ping.go +++ /dev/null @@ -1,32 +0,0 @@ -// Package ping implements WebSocket PING messages. -package ping - -import ( - "encoding/json" - "time" - - "github.com/gorilla/websocket" -) - -// SendTicks sends the current ticks as a ping message. -func SendTicks(conn *websocket.Conn, deadline time.Time) error { - // TODO(bassosimone): when we'll have a unique base time.Time reference for - // the whole test, we should use that, since UnixNano() is not monotonic. - ticks := int64(time.Now().UnixNano()) - data, err := json.Marshal(ticks) - if err == nil { - err = conn.WriteControl(websocket.PingMessage, data, deadline) - } - return err -} - -func ParseTicks(s string) (d int64, err error) { - // TODO(bassosimone): when we'll have a unique base time.Time reference for - // the whole test, we should use that, since UnixNano() is not monotonic. - var prev int64 - err = json.Unmarshal([]byte(s), &prev) - if err == nil { - d = (int64(time.Now().UnixNano()) - prev) - } - return -} diff --git a/ndt7/receiver/receiver.go b/ndt7/receiver/receiver.go index ab423ed2..d649ec00 100644 --- a/ndt7/receiver/receiver.go +++ b/ndt7/receiver/receiver.go @@ -10,7 +10,7 @@ import ( "github.com/gorilla/websocket" "github.com/m-lab/ndt-server/logging" "github.com/m-lab/ndt-server/ndt7/model" - "github.com/m-lab/ndt-server/ndt7/ping" + "github.com/m-lab/ndt-server/ndt7/ping/message" "github.com/m-lab/ndt-server/ndt7/spec" ) @@ -23,7 +23,7 @@ const ( func loop( ctx context.Context, conn *websocket.Conn, kind receiverKind, - dst chan<- model.Measurement, + dst chan<- model.Measurement, start time.Time, ) { logging.Logger.Debug("receiver: start") defer logging.Logger.Debug("receiver: stop") @@ -31,16 +31,17 @@ func loop( conn.SetReadLimit(spec.MaxMessageSize) receiverctx, cancel := context.WithTimeout(ctx, spec.MaxRuntime) defer cancel() - err := conn.SetReadDeadline(time.Now().Add(spec.MaxRuntime)) // Liveness! + err := conn.SetReadDeadline(start.Add(spec.MaxRuntime)) // Liveness! if err != nil { logging.Logger.WithError(err).Warn("receiver: conn.SetReadDeadline failed") return } conn.SetPongHandler(func(s string) error { - rtt, err := ping.ParseTicks(s) + _, rtt, err := message.ParseTicks(s, start) if err == nil { - rtt /= int64(time.Millisecond) - logging.Logger.Debugf("receiver: ApplicationLevel RTT: %d ms", rtt) + // Writing rtt to |dst| will write the Measurement to `ClientMeasurements` object. + // That goes against data format, so the value is just logged. + logging.Logger.Debugf("receiver: ApplicationLevel RTT: %d ms", int64(rtt / time.Millisecond)) } return err }) @@ -72,9 +73,9 @@ func loop( } } -func start(ctx context.Context, conn *websocket.Conn, kind receiverKind) <-chan model.Measurement { +func startReceiver(ctx context.Context, conn *websocket.Conn, kind receiverKind, start time.Time) (<-chan model.Measurement) { dst := make(chan model.Measurement) - go loop(ctx, conn, kind, dst) + go loop(ctx, conn, kind, dst, start) return dst } @@ -87,13 +88,13 @@ func start(ctx context.Context, conn *websocket.Conn, kind receiverKind) <-chan // Liveness guarantee: the goroutine will always terminate after a // MaxRuntime timeout, provided that the consumer will keep reading // from the returned channel. -func StartDownloadReceiver(ctx context.Context, conn *websocket.Conn) <-chan model.Measurement { - return start(ctx, conn, downloadReceiver) +func StartDownloadReceiver(ctx context.Context, conn *websocket.Conn, start time.Time) (<-chan model.Measurement) { + return startReceiver(ctx, conn, downloadReceiver, start) } // StartUploadReceiver is like StartDownloadReceiver except that it // tolerates incoming binary messages, which are sent to cause // network load, and therefore must not be rejected. -func StartUploadReceiver(ctx context.Context, conn *websocket.Conn) <-chan model.Measurement { - return start(ctx, conn, uploadReceiver) +func StartUploadReceiver(ctx context.Context, conn *websocket.Conn, start time.Time) (<-chan model.Measurement) { + return startReceiver(ctx, conn, uploadReceiver, start) } diff --git a/ndt7/upload/sender/sender.go b/ndt7/upload/sender/sender.go index ff46a130..3b101208 100644 --- a/ndt7/upload/sender/sender.go +++ b/ndt7/upload/sender/sender.go @@ -8,13 +8,13 @@ import ( "github.com/m-lab/ndt-server/logging" "github.com/m-lab/ndt-server/ndt7/closer" "github.com/m-lab/ndt-server/ndt7/model" - "github.com/m-lab/ndt-server/ndt7/ping" + "github.com/m-lab/ndt-server/ndt7/ping/message" "github.com/m-lab/ndt-server/ndt7/spec" ) func loop( conn *websocket.Conn, src <-chan model.Measurement, - dst chan<- model.Measurement, + dst chan<- model.Measurement, start time.Time, ) { logging.Logger.Debug("sender: start") defer logging.Logger.Debug("sender: stop") @@ -24,12 +24,15 @@ func loop( // make sure we drain the channel } }() - deadline := time.Now().Add(spec.MaxRuntime) + deadline := start.Add(spec.MaxRuntime) err := conn.SetWriteDeadline(deadline) // Liveness! if err != nil { logging.Logger.WithError(err).Warn("sender: conn.SetWriteDeadline failed") return } + // There is no "special" first RTT sample for /upload. The server can't control if the client + // has already started flooding the connection. So, if server sends ping frame here, + // the pong frame may be still affected by HOL, like every other ping-pong frame. for { m, ok := <-src if !ok { // This means that the previous step has terminated @@ -41,8 +44,8 @@ func loop( return } dst <- m // Liveness: this is blocking - if err := ping.SendTicks(conn, deadline); err != nil { - logging.Logger.WithError(err).Warn("sender: ping.SendTicks failed") + if err := message.SendTicks(conn, start, deadline); err != nil { + logging.Logger.WithError(err).Warn("sender: ping.message.SendTicks failed") return } } @@ -55,9 +58,9 @@ func loop( // Liveness guarantee: the sender will not be stuck sending for more then // the MaxRuntime of the subtest, provided that the consumer will // continue reading from the returned channel. This is enforced by -// setting the write deadline to MaxRuntime + time.Now. -func Start(conn *websocket.Conn, src <-chan model.Measurement) <-chan model.Measurement { +// setting the write deadline to |start| + MaxRuntime. +func Start(conn *websocket.Conn, src <-chan model.Measurement, start time.Time) <-chan model.Measurement { dst := make(chan model.Measurement) - go loop(conn, src, dst) + go loop(conn, src, dst, start) return dst } diff --git a/ndt7/upload/upload.go b/ndt7/upload/upload.go index d67b701f..01d363d9 100644 --- a/ndt7/upload/upload.go +++ b/ndt7/upload/upload.go @@ -3,6 +3,7 @@ package upload import ( "context" + "time" "github.com/gorilla/websocket" "github.com/m-lab/ndt-server/ndt7/measurer" @@ -15,14 +16,15 @@ import ( // Do implements the upload subtest. The ctx argument is the parent context // for the subtest. The conn argument is the open WebSocket connection. The // resultfp argument is the file where to save results. Both arguments are -// owned by the caller of this function. -func Do(ctx context.Context, conn *websocket.Conn, resultfp *results.File) { +// owned by the caller of this function. The start argument is the test +// start time used to calculate ElapsedTime and deadlines. +func Do(ctx context.Context, conn *websocket.Conn, resultfp *results.File, start time.Time) { // Implementation note: use child context so that, if we cannot save the // results in the loop below, we terminate the goroutines early - wholectx, cancel := context.WithCancel(ctx) + ctx, cancel := context.WithCancel(ctx) defer cancel() - measurer := measurer.New(conn, resultfp.Data.UUID) - senderch := sender.Start(conn, measurer.Start(ctx)) - receiverch := receiver.StartUploadReceiver(wholectx, conn) + measurer := measurer.New(conn, resultfp.Data.UUID, start) + senderch := sender.Start(conn, measurer.Start(ctx), start) + receiverch := receiver.StartUploadReceiver(ctx, conn, start) saver.SaveAll(resultfp, senderch, receiverch) }