Skip to content

Commit f1f28fd

Browse files
committed
implement serve helper function
1 parent 9087c24 commit f1f28fd

File tree

2 files changed

+57
-42
lines changed

2 files changed

+57
-42
lines changed

cmd/state/state.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ type GlobalState struct {
7979
SecretsManager *secretsource.Manager
8080
Usage *usage.Usage
8181
TestStatus *lib.TestStatus
82+
// ServerListener is used mainly for testing purposes (See https://github.com/grafana/k6/issues/3846)
8283
ServerListener net.Listener
8384
}
8485

internal/cmd/run.go

Lines changed: 56 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"errors"
66
"fmt"
77
"io"
8-
"net"
98
"net/http"
109
"os"
1110
"strings"
@@ -56,6 +55,54 @@ const (
5655
waitForTracerProviderStopTimeout = 3 * time.Minute
5756
)
5857

58+
// serve starts the REST API server and ensures it is properly shut down when
59+
// the srvCtx is done. It returns a function that can be waited on to ensure
60+
// the server has fully stopped.
61+
//
62+
// nolint:nestif
63+
func (c *cmdRun) serve(cmd *cobra.Command, srv *http.Server, shutdown func()) func() {
64+
var logger logrus.FieldLogger = c.gs.Logger
65+
66+
// We cannot use backgroundProcesses here, since we need the REST API to
67+
// be down before we can close the samples channel above and finish the
68+
// processing the metrics pipeline.
69+
apiWG := &sync.WaitGroup{}
70+
apiWG.Add(2)
71+
72+
go func() {
73+
defer apiWG.Done()
74+
logger.Debugf("Starting the REST API server on %s", c.gs.Flags.Address)
75+
if c.gs.Flags.ProfilingEnabled {
76+
logger.Debugf("Profiling exposed on http://%s/debug/pprof/", c.gs.Flags.Address)
77+
}
78+
79+
// ServerListener is set up in tests
80+
if c.gs.ServerListener != nil {
81+
if err := srv.Serve(c.gs.ServerListener); err != nil && !errors.Is(err, http.ErrServerClosed) {
82+
logger.WithError(err).Error("Error from API server")
83+
c.gs.OSExit(int(exitcodes.CannotStartRESTAPI))
84+
}
85+
} else {
86+
if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
87+
// Only exit k6 if the user has explicitly set the REST API address
88+
if cmd.Flags().Lookup("address").Changed {
89+
logger.WithError(err).Error("Error from API server")
90+
c.gs.OSExit(int(exitcodes.CannotStartRESTAPI))
91+
} else {
92+
logger.WithError(err).Warn("Error from API server")
93+
}
94+
}
95+
}
96+
}()
97+
98+
go func() {
99+
defer apiWG.Done()
100+
shutdown()
101+
}()
102+
103+
return apiWG.Wait
104+
}
105+
59106
// TODO: split apart some more
60107
//
61108
//nolint:funlen,gocognit,gocyclo,cyclop
@@ -290,18 +337,10 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) {
290337
})
291338
samples := make(chan metrics.SampleContainer, test.derivedConfig.MetricSamplesBufferSize.Int64)
292339
// Spin up the REST API server, if not disabled.
293-
if c.gs.Flags.Address != "" { //nolint:nestif
340+
if c.gs.Flags.Address != "" {
294341
initBar.Modify(pb.WithConstProgress(0, "Init API server"))
295342

296-
// We cannot use backgroundProcesses here, since we need the REST API to
297-
// be down before we can close the samples channel above and finish the
298-
// processing the metrics pipeline.
299-
apiWG := &sync.WaitGroup{}
300-
apiWG.Add(2)
301-
defer apiWG.Wait()
302-
303343
srvCtx, srvCancel := context.WithCancel(globalCtx)
304-
defer srvCancel()
305344

306345
srv := api.GetServer(
307346
runCtx,
@@ -311,44 +350,19 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) {
311350
metricsEngine,
312351
execScheduler,
313352
)
314-
go func() {
315-
defer apiWG.Done()
316-
logger.Debugf("Starting the REST API server on %s", c.gs.Flags.Address)
317-
if c.gs.Flags.ProfilingEnabled {
318-
logger.Debugf("Profiling exposed on http://%s/debug/pprof/", c.gs.Flags.Address)
319-
}
320-
if c.gs.ServerListener == nil {
321-
listener, err := (&net.ListenConfig{}).Listen(srvCtx, "tcp", c.gs.Flags.Address)
322-
if err != nil {
323-
logger.WithError(err).Error("Error creating API server listener")
324-
c.gs.OSExit(int(exitcodes.CannotStartRESTAPI))
325-
}
326-
c.gs.ServerListener = listener
327-
}
328-
defer func() {
329-
if lerr := c.gs.ServerListener.Close(); lerr != nil {
330-
logger.WithError(lerr).Debug("Error closing API server listener")
331-
}
332-
c.gs.ServerListener = nil
333-
}()
334-
if aerr := srv.Serve(c.gs.ServerListener); aerr != nil && !errors.Is(aerr, http.ErrServerClosed) {
335-
// Only exit k6 if the user has explicitly set the REST API address
336-
if cmd.Flags().Lookup("address").Changed {
337-
logger.WithError(aerr).Error("Error from API server")
338-
c.gs.OSExit(int(exitcodes.CannotStartRESTAPI))
339-
} else {
340-
logger.WithError(aerr).Warn("Error from API server")
341-
}
342-
}
343-
}()
344-
go func() {
345-
defer apiWG.Done()
353+
shutdown := func() {
346354
<-srvCtx.Done()
347355
shutdCtx, shutdCancel := context.WithTimeout(globalCtx, 1*time.Second)
348356
defer shutdCancel()
349357
if aerr := srv.Shutdown(shutdCtx); aerr != nil {
350358
logger.WithError(aerr).Debug("REST API server did not shut down correctly")
351359
}
360+
}
361+
srvWait := c.serve(cmd, srv, shutdown)
362+
defer func() {
363+
// Cancel the server context before shutdown to avoid the deadlocks
364+
srvCancel()
365+
srvWait()
352366
}()
353367
}
354368

0 commit comments

Comments
 (0)