Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions ndt7/download/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package download

import (
"context"
"time"

"github.com/gorilla/websocket"
"github.com/m-lab/ndt-server/ndt7/download/sender"
Expand All @@ -15,14 +16,15 @@ import (
// Do implements the download subtest. The ctx argument is the parent
// context for the subtest. The conn argument is the open WebSocket
// connection. The resultfp argument is the file where to save results. Both
// arguments are owned by the caller of this function.
func Do(ctx context.Context, conn *websocket.Conn, resultfp *results.File) {
// arguments are owned by the caller of this function. The start argument is
// the test start time used to calculate ElapsedTime and deadlines.
func Do(ctx context.Context, conn *websocket.Conn, resultfp *results.File, start time.Time) {
// Implementation note: use child context so that, if we cannot save the
// results in the loop below, we terminate the goroutines early
wholectx, cancel := context.WithCancel(ctx)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
measurer := measurer.New(conn, resultfp.Data.UUID)
senderch := sender.Start(conn, measurer.Start(ctx))
receiverch := receiver.StartDownloadReceiver(wholectx, conn)
measurer := measurer.New(conn, resultfp.Data.UUID, start)
senderch := sender.Start(conn, measurer.Start(ctx), start)
receiverch := receiver.StartDownloadReceiver(ctx, conn, start)
saver.SaveAll(resultfp, senderch, receiverch)
}
22 changes: 14 additions & 8 deletions ndt7/download/sender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/m-lab/ndt-server/logging"
"github.com/m-lab/ndt-server/ndt7/closer"
"github.com/m-lab/ndt-server/ndt7/model"
"github.com/m-lab/ndt-server/ndt7/ping"
"github.com/m-lab/ndt-server/ndt7/ping/message"
"github.com/m-lab/ndt-server/ndt7/spec"
)

Expand All @@ -22,7 +22,7 @@ func makePreparedMessage(size int) (*websocket.PreparedMessage, error) {
return websocket.NewPreparedMessage(websocket.BinaryMessage, data)
}

func loop(conn *websocket.Conn, src <-chan model.Measurement, dst chan<- model.Measurement) {
func loop(conn *websocket.Conn, src <-chan model.Measurement, dst chan<- model.Measurement, start time.Time) {
logging.Logger.Debug("sender: start")
defer logging.Logger.Debug("sender: stop")
defer close(dst)
Expand All @@ -38,12 +38,18 @@ func loop(conn *websocket.Conn, src <-chan model.Measurement, dst chan<- model.M
logging.Logger.WithError(err).Warn("sender: makePreparedMessage failed")
return
}
deadline := time.Now().Add(spec.MaxRuntime)
deadline := start.Add(spec.MaxRuntime)
err = conn.SetWriteDeadline(deadline) // Liveness!
if err != nil {
logging.Logger.WithError(err).Warn("sender: conn.SetWriteDeadline failed")
return
}
// One RTT sample is taken before flooding the connection with data.
// That sample is not affected by HOL, so it has additional value and is treated specially.
if err := message.SendTicks(conn, start, deadline); err != nil {
logging.Logger.WithError(err).Warn("sender: ping.message.SendTicks failed")
return
}
var totalSent int64
for {
select {
Expand All @@ -57,8 +63,8 @@ func loop(conn *websocket.Conn, src <-chan model.Measurement, dst chan<- model.M
return
}
dst <- m // Liveness: this is blocking
if err := ping.SendTicks(conn, deadline); err != nil {
logging.Logger.WithError(err).Warn("sender: ping.SendTicks failed")
if err := message.SendTicks(conn, start, deadline); err != nil {
logging.Logger.WithError(err).Warn("sender: ping.message.SendTicks failed")
return
}
default:
Expand Down Expand Up @@ -99,9 +105,9 @@ func loop(conn *websocket.Conn, src <-chan model.Measurement, dst chan<- model.M
// Liveness guarantee: the sender will not be stuck sending for more then
// the MaxRuntime of the subtest, provided that the consumer will
// continue reading from the returned channel. This is enforced by
// setting the write deadline to Time.Now() + MaxRuntime.
func Start(conn *websocket.Conn, src <-chan model.Measurement) <-chan model.Measurement {
// setting the write deadline to |start| + MaxRuntime.
func Start(conn *websocket.Conn, src <-chan model.Measurement, start time.Time) <-chan model.Measurement {
dst := make(chan model.Measurement)
go loop(conn, src, dst)
go loop(conn, src, dst, start)
return dst
}
7 changes: 4 additions & 3 deletions ndt7/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ func warnAndClose(writer http.ResponseWriter, message string) {
// testerFunc is the function implementing a subtest. The first argument
// is the subtest context. The second argument is the connected websocket. The
// third argument is the open file where to write results. This function does
// not own the second or the third argument.
type testerFunc = func(context.Context, *websocket.Conn, *results.File)
// not own the second or the third argument. The fourth argument is the base
// start time of the test.
type testerFunc = func(context.Context, *websocket.Conn, *results.File, time.Time)

// downloadOrUpload implements both download and upload. The writer argument
// is the HTTP response writer. The request argument is the HTTP request
Expand Down Expand Up @@ -114,7 +115,7 @@ func (h Handler) downloadOrUpload(writer http.ResponseWriter, request *http.Requ
}
warnonerror.Close(resultfp, string(kind)+": ignoring resultfp.Close error")
}()
tester(request.Context(), conn, resultfp)
tester(request.Context(), conn, resultfp, result.StartTime)
}

// Download handles the download subtest.
Expand Down
15 changes: 8 additions & 7 deletions ndt7/measurer/measurer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,17 @@ import (

// Measurer performs measurements
type Measurer struct {
conn *websocket.Conn
uuid string
conn *websocket.Conn
start time.Time
uuid string
}

// New creates a new measurer instance
func New(conn *websocket.Conn, UUID string) *Measurer {
func New(conn *websocket.Conn, UUID string, start time.Time) *Measurer {
return &Measurer{
conn: conn,
uuid: UUID,
conn: conn,
start: start,
uuid: UUID,
}
}

Expand Down Expand Up @@ -80,7 +82,6 @@ func (m *Measurer) loop(ctx context.Context, dst chan<- model.Measurement) {
return
}
defer sockfp.Close()
start := time.Now()
connectionInfo := &model.ConnectionInfo{
Client: m.conn.RemoteAddr().String(),
Server: m.conn.LocalAddr().String(),
Expand All @@ -104,7 +105,7 @@ func (m *Measurer) loop(ctx context.Context, dst chan<- model.Measurement) {
return
}
var measurement model.Measurement
measure(&measurement, sockfp, now.Sub(start))
measure(&measurement, sockfp, now.Sub(m.start))
measurement.ConnectionInfo = connectionInfo
dst <- measurement // Liveness: this is blocking
}
Expand Down
47 changes: 47 additions & 0 deletions ndt7/ping/message/message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Package message implements operations with WebSocket PING messages.
package message

import (
"encoding/json"
"errors"
"time"

"github.com/gorilla/websocket"
)

// The json object is used as a namespace to avoid erratic interpretation of
// unsolicited pong frames. Ping and pong frames are not a part of
// Sec-WebSocket-Protocol, they're part of RFC6455. Section 5.5.3 of the RFC
// allows unsolicited pong frames. Some browsers are known to send unsolicited
// pong frames, see golang/go#6377 <https://github.com/golang/go/issues/6377>.
type pingMessage struct {
Ndt7TS int64
}

// SendTicks sends the current ticks as a ping message.
func SendTicks(conn *websocket.Conn, start time.Time, deadline time.Time) error {
msg := pingMessage{
Ndt7TS: time.Since(start).Nanoseconds(),
}
data, err := json.Marshal(msg)
if err == nil {
err = conn.WriteControl(websocket.PingMessage, data, deadline)
}
return err
}

func ParseTicks(s string, start time.Time) (elapsed time.Duration, d time.Duration, err error) {
elapsed = time.Since(start)
var msg pingMessage
err = json.Unmarshal([]byte(s), &msg)
if err != nil {
return
}
prev := msg.Ndt7TS
if 0 <= prev && prev <= elapsed.Nanoseconds() {
d = time.Duration(elapsed.Nanoseconds() - prev)
} else {
err = errors.New("RTT is negative")
}
return
}
32 changes: 0 additions & 32 deletions ndt7/ping/ping.go

This file was deleted.

25 changes: 13 additions & 12 deletions ndt7/receiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/gorilla/websocket"
"github.com/m-lab/ndt-server/logging"
"github.com/m-lab/ndt-server/ndt7/model"
"github.com/m-lab/ndt-server/ndt7/ping"
"github.com/m-lab/ndt-server/ndt7/ping/message"
"github.com/m-lab/ndt-server/ndt7/spec"
)

Expand All @@ -23,24 +23,25 @@ const (

func loop(
ctx context.Context, conn *websocket.Conn, kind receiverKind,
dst chan<- model.Measurement,
dst chan<- model.Measurement, start time.Time,
) {
logging.Logger.Debug("receiver: start")
defer logging.Logger.Debug("receiver: stop")
defer close(dst)
conn.SetReadLimit(spec.MaxMessageSize)
receiverctx, cancel := context.WithTimeout(ctx, spec.MaxRuntime)
defer cancel()
err := conn.SetReadDeadline(time.Now().Add(spec.MaxRuntime)) // Liveness!
err := conn.SetReadDeadline(start.Add(spec.MaxRuntime)) // Liveness!
if err != nil {
logging.Logger.WithError(err).Warn("receiver: conn.SetReadDeadline failed")
return
}
conn.SetPongHandler(func(s string) error {
rtt, err := ping.ParseTicks(s)
_, rtt, err := message.ParseTicks(s, start)
if err == nil {
rtt /= int64(time.Millisecond)
logging.Logger.Debugf("receiver: ApplicationLevel RTT: %d ms", rtt)
// Writing rtt to |dst| will write the Measurement to `ClientMeasurements` object.
// That goes against data format, so the value is just logged.
logging.Logger.Debugf("receiver: ApplicationLevel RTT: %d ms", int64(rtt / time.Millisecond))
}
return err
})
Expand Down Expand Up @@ -72,9 +73,9 @@ func loop(
}
}

func start(ctx context.Context, conn *websocket.Conn, kind receiverKind) <-chan model.Measurement {
func startReceiver(ctx context.Context, conn *websocket.Conn, kind receiverKind, start time.Time) (<-chan model.Measurement) {
dst := make(chan model.Measurement)
go loop(ctx, conn, kind, dst)
go loop(ctx, conn, kind, dst, start)
return dst
}

Expand All @@ -87,13 +88,13 @@ func start(ctx context.Context, conn *websocket.Conn, kind receiverKind) <-chan
// Liveness guarantee: the goroutine will always terminate after a
// MaxRuntime timeout, provided that the consumer will keep reading
// from the returned channel.
func StartDownloadReceiver(ctx context.Context, conn *websocket.Conn) <-chan model.Measurement {
return start(ctx, conn, downloadReceiver)
func StartDownloadReceiver(ctx context.Context, conn *websocket.Conn, start time.Time) (<-chan model.Measurement) {
return startReceiver(ctx, conn, downloadReceiver, start)
}

// StartUploadReceiver is like StartDownloadReceiver except that it
// tolerates incoming binary messages, which are sent to cause
// network load, and therefore must not be rejected.
func StartUploadReceiver(ctx context.Context, conn *websocket.Conn) <-chan model.Measurement {
return start(ctx, conn, uploadReceiver)
func StartUploadReceiver(ctx context.Context, conn *websocket.Conn, start time.Time) (<-chan model.Measurement) {
return startReceiver(ctx, conn, uploadReceiver, start)
}
19 changes: 11 additions & 8 deletions ndt7/upload/sender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ import (
"github.com/m-lab/ndt-server/logging"
"github.com/m-lab/ndt-server/ndt7/closer"
"github.com/m-lab/ndt-server/ndt7/model"
"github.com/m-lab/ndt-server/ndt7/ping"
"github.com/m-lab/ndt-server/ndt7/ping/message"
"github.com/m-lab/ndt-server/ndt7/spec"
)

func loop(
conn *websocket.Conn, src <-chan model.Measurement,
dst chan<- model.Measurement,
dst chan<- model.Measurement, start time.Time,
) {
logging.Logger.Debug("sender: start")
defer logging.Logger.Debug("sender: stop")
Expand All @@ -24,12 +24,15 @@ func loop(
// make sure we drain the channel
}
}()
deadline := time.Now().Add(spec.MaxRuntime)
deadline := start.Add(spec.MaxRuntime)
err := conn.SetWriteDeadline(deadline) // Liveness!
if err != nil {
logging.Logger.WithError(err).Warn("sender: conn.SetWriteDeadline failed")
return
}
// There is no "special" first RTT sample for /upload. The server can't control if the client
// has already started flooding the connection. So, if server sends ping frame here,
// the pong frame may be still affected by HOL, like every other ping-pong frame.
for {
m, ok := <-src
if !ok { // This means that the previous step has terminated
Expand All @@ -41,8 +44,8 @@ func loop(
return
}
dst <- m // Liveness: this is blocking
if err := ping.SendTicks(conn, deadline); err != nil {
logging.Logger.WithError(err).Warn("sender: ping.SendTicks failed")
if err := message.SendTicks(conn, start, deadline); err != nil {
logging.Logger.WithError(err).Warn("sender: ping.message.SendTicks failed")
return
}
}
Expand All @@ -55,9 +58,9 @@ func loop(
// Liveness guarantee: the sender will not be stuck sending for more then
// the MaxRuntime of the subtest, provided that the consumer will
// continue reading from the returned channel. This is enforced by
// setting the write deadline to MaxRuntime + time.Now.
func Start(conn *websocket.Conn, src <-chan model.Measurement) <-chan model.Measurement {
// setting the write deadline to |start| + MaxRuntime.
func Start(conn *websocket.Conn, src <-chan model.Measurement, start time.Time) <-chan model.Measurement {
dst := make(chan model.Measurement)
go loop(conn, src, dst)
go loop(conn, src, dst, start)
return dst
}
14 changes: 8 additions & 6 deletions ndt7/upload/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package upload

import (
"context"
"time"

"github.com/gorilla/websocket"
"github.com/m-lab/ndt-server/ndt7/measurer"
Expand All @@ -15,14 +16,15 @@ import (
// Do implements the upload subtest. The ctx argument is the parent context
// for the subtest. The conn argument is the open WebSocket connection. The
// resultfp argument is the file where to save results. Both arguments are
// owned by the caller of this function.
func Do(ctx context.Context, conn *websocket.Conn, resultfp *results.File) {
// owned by the caller of this function. The start argument is the test
// start time used to calculate ElapsedTime and deadlines.
func Do(ctx context.Context, conn *websocket.Conn, resultfp *results.File, start time.Time) {
// Implementation note: use child context so that, if we cannot save the
// results in the loop below, we terminate the goroutines early
wholectx, cancel := context.WithCancel(ctx)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
measurer := measurer.New(conn, resultfp.Data.UUID)
senderch := sender.Start(conn, measurer.Start(ctx))
receiverch := receiver.StartUploadReceiver(wholectx, conn)
measurer := measurer.New(conn, resultfp.Data.UUID, start)
senderch := sender.Start(conn, measurer.Start(ctx), start)
receiverch := receiver.StartUploadReceiver(ctx, conn, start)
saver.SaveAll(resultfp, senderch, receiverch)
}