Skip to content

Commit 077f71d

Browse files
authored
opt: renovate the concurrency management of gnet engine (#663)
1 parent 451f015 commit 077f71d

File tree

6 files changed

+106
-147
lines changed

6 files changed

+106
-147
lines changed

acceptor_windows.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ import (
1818
"errors"
1919
"net"
2020
"runtime"
21-
"sync/atomic"
2221

2322
errorx "github.com/panjf2000/gnet/v2/pkg/errors"
2423
)
@@ -36,7 +35,7 @@ func (eng *engine) listenStream(ln net.Listener) (err error) {
3635
tc, e := ln.Accept()
3736
if e != nil {
3837
err = e
39-
if atomic.LoadInt32(&eng.beingShutdown) == 0 {
38+
if !eng.beingShutdown.Load() {
4039
eng.opts.Logger.Errorf("Accept() fails due to error: %v", err)
4140
} else if errors.Is(err, net.ErrClosed) {
4241
err = errors.Join(err, errorx.ErrEngineShutdown)
@@ -74,7 +73,7 @@ func (eng *engine) ListenUDP(pc net.PacketConn) (err error) {
7473
n, addr, e := pc.ReadFrom(buffer[:])
7574
if e != nil {
7675
err = e
77-
if atomic.LoadInt32(&eng.beingShutdown) == 0 {
76+
if !eng.beingShutdown.Load() {
7877
eng.opts.Logger.Errorf("failed to receive data from UDP fd due to error:%v", err)
7978
} else if errors.Is(err, net.ErrClosed) {
8079
err = errors.Join(err, errorx.ErrEngineShutdown)

client_unix.go

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222
"errors"
2323
"net"
2424
"strconv"
25-
"sync"
2625
"syscall"
2726

2827
"golang.org/x/sync/errgroup"
@@ -66,20 +65,17 @@ func NewClient(eh EventHandler, opts ...Option) (cli *Client, err error) {
6665
return
6766
}
6867

69-
shutdownCtx, shutdown := context.WithCancel(context.Background())
68+
rootCtx, shutdown := context.WithCancel(context.Background())
69+
eg, ctx := errgroup.WithContext(rootCtx)
7070
eng := engine{
7171
listeners: make(map[int]*listener),
7272
opts: options,
73+
turnOff: shutdown,
7374
eventHandler: eh,
74-
workerPool: struct {
75+
concurrency: struct {
7576
*errgroup.Group
76-
shutdownCtx context.Context
77-
shutdown context.CancelFunc
78-
once sync.Once
79-
}{&errgroup.Group{}, shutdownCtx, shutdown, sync.Once{}},
80-
}
81-
if options.Ticker {
82-
eng.ticker.ctx, eng.ticker.cancel = context.WithCancel(context.Background())
77+
ctx context.Context
78+
}{eg, ctx},
8379
}
8480
el := eventloop{
8581
listeners: eng.listeners,
@@ -124,10 +120,14 @@ func NewClient(eh EventHandler, opts ...Option) (cli *Client, err error) {
124120
func (cli *Client) Start() error {
125121
logging.Infof("Starting gnet client with 1 event-loop")
126122
cli.el.eventHandler.OnBoot(Engine{cli.el.engine})
127-
cli.el.engine.workerPool.Go(cli.el.run)
123+
cli.el.engine.concurrency.Go(cli.el.run)
128124
// Start the ticker.
129125
if cli.opts.Ticker {
130-
go cli.el.ticker(cli.el.engine.ticker.ctx)
126+
ctx := cli.el.engine.concurrency.ctx
127+
cli.el.engine.concurrency.Go(func() error {
128+
cli.el.ticker(ctx)
129+
return nil
130+
})
131131
}
132132
logging.Debugf("default logging level is %s", logging.LogLevel())
133133
return nil
@@ -136,11 +136,7 @@ func (cli *Client) Start() error {
136136
// Stop stops the client event-loop.
137137
func (cli *Client) Stop() (err error) {
138138
logging.Error(cli.el.poller.Trigger(queue.HighPriority, func(_ any) error { return errorx.ErrEngineShutdown }, nil))
139-
// Stop the ticker.
140-
if cli.opts.Ticker {
141-
cli.el.engine.ticker.cancel()
142-
}
143-
_ = cli.el.engine.workerPool.Wait()
139+
err = cli.el.engine.concurrency.Wait()
144140
logging.Error(cli.el.poller.Close())
145141
cli.el.eventHandler.OnShutdown(Engine{cli.el.engine})
146142
logging.Cleanup()

client_windows.go

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -48,17 +48,17 @@ func NewClient(eh EventHandler, opts ...Option) (cli *Client, err error) {
4848
}
4949
logging.SetDefaultLoggerAndFlusher(logger, logFlusher)
5050

51-
shutdownCtx, shutdown := context.WithCancel(context.Background())
51+
rootCtx, shutdown := context.WithCancel(context.Background())
52+
eg, ctx := errgroup.WithContext(rootCtx)
5253
eng := &engine{
53-
listeners: []*listener{},
54-
opts: options,
55-
workerPool: struct {
56-
*errgroup.Group
57-
shutdownCtx context.Context
58-
shutdown context.CancelFunc
59-
once sync.Once
60-
}{&errgroup.Group{}, shutdownCtx, shutdown, sync.Once{}},
54+
listeners: []*listener{},
55+
opts: options,
56+
turnOff: shutdown,
6157
eventHandler: eh,
58+
concurrency: struct {
59+
*errgroup.Group
60+
ctx context.Context
61+
}{eg, ctx},
6262
}
6363
cli.el = &eventloop{
6464
ch: make(chan any, 1024),
@@ -71,11 +71,11 @@ func NewClient(eh EventHandler, opts ...Option) (cli *Client, err error) {
7171

7272
func (cli *Client) Start() error {
7373
cli.el.eventHandler.OnBoot(Engine{cli.el.eng})
74-
cli.el.eng.workerPool.Go(cli.el.run)
74+
cli.el.eng.concurrency.Go(cli.el.run)
7575
if cli.opts.Ticker {
76-
cli.el.eng.ticker.ctx, cli.el.eng.ticker.cancel = context.WithCancel(context.Background())
77-
cli.el.eng.workerPool.Go(func() error {
78-
cli.el.ticker(cli.el.eng.ticker.ctx)
76+
ctx := cli.el.eng.concurrency.ctx
77+
cli.el.eng.concurrency.Go(func() error {
78+
cli.el.ticker(ctx)
7979
return nil
8080
})
8181
}
@@ -85,10 +85,7 @@ func (cli *Client) Start() error {
8585

8686
func (cli *Client) Stop() (err error) {
8787
cli.el.ch <- errorx.ErrEngineShutdown
88-
if cli.opts.Ticker {
89-
cli.el.eng.ticker.cancel()
90-
}
91-
_ = cli.el.eng.workerPool.Wait()
88+
err = cli.el.eng.concurrency.Wait()
9289
cli.el.eventHandler.OnShutdown(Engine{cli.el.eng})
9390
logging.Cleanup()
9491
return

engine_unix.go

Lines changed: 40 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222
"errors"
2323
"runtime"
2424
"strings"
25-
"sync"
2625
"sync/atomic"
2726

2827
"golang.org/x/sync/errgroup"
@@ -35,27 +34,22 @@ import (
3534
)
3635

3736
type engine struct {
38-
listeners map[int]*listener // listeners for accepting incoming connections
39-
opts *Options // options with engine
40-
ingress *eventloop // main event-loop that monitors all listeners
41-
eventLoops loadBalancer // event-loops for handling events
42-
inShutdown int32 // whether the engine is in shutdown
43-
ticker struct {
44-
ctx context.Context // context for ticker
45-
cancel context.CancelFunc // function to stop the ticker
46-
}
47-
workerPool struct {
37+
listeners map[int]*listener // listeners for accepting incoming connections
38+
opts *Options // options with engine
39+
ingress *eventloop // main event-loop that monitors all listeners
40+
eventLoops loadBalancer // event-loops for handling events
41+
inShutdown atomic.Bool // whether the engine is in shutdown
42+
turnOff context.CancelFunc
43+
eventHandler EventHandler // user eventHandler
44+
concurrency struct {
4845
*errgroup.Group
4946

50-
shutdownCtx context.Context
51-
shutdown context.CancelFunc
52-
once sync.Once
47+
ctx context.Context
5348
}
54-
eventHandler EventHandler // user eventHandler
5549
}
5650

57-
func (eng *engine) isInShutdown() bool {
58-
return atomic.LoadInt32(&eng.inShutdown) == 1
51+
func (eng *engine) isShutdown() bool {
52+
return eng.inShutdown.Load()
5953
}
6054

6155
// shutdown signals the engine to shut down.
@@ -64,9 +58,7 @@ func (eng *engine) shutdown(err error) {
6458
eng.opts.Logger.Errorf("engine is being shutdown with error: %v", err)
6559
}
6660

67-
eng.workerPool.once.Do(func() {
68-
eng.workerPool.shutdown()
69-
})
61+
eng.turnOff()
7062
}
7163

7264
func (eng *engine) closeEventLoops() {
@@ -88,7 +80,7 @@ func (eng *engine) closeEventLoops() {
8880
}
8981
}
9082

91-
func (eng *engine) runEventLoops(numEventLoop int) error {
83+
func (eng *engine) runEventLoops(ctx context.Context, numEventLoop int) error {
9284
var el0 *eventloop
9385
lns := eng.listeners
9486
// Create loops locally and bind the listeners.
@@ -129,21 +121,21 @@ func (eng *engine) runEventLoops(numEventLoop int) error {
129121

130122
// Start event-loops in background.
131123
eng.eventLoops.iterate(func(_ int, el *eventloop) bool {
132-
eng.workerPool.Go(el.run)
124+
eng.concurrency.Go(el.run)
133125
return true
134126
})
135127

136128
if el0 != nil {
137-
eng.workerPool.Go(func() error {
138-
el0.ticker(eng.ticker.ctx)
129+
eng.concurrency.Go(func() error {
130+
el0.ticker(ctx)
139131
return nil
140132
})
141133
}
142134

143135
return nil
144136
}
145137

146-
func (eng *engine) activateReactors(numEventLoop int) error {
138+
func (eng *engine) activateReactors(ctx context.Context, numEventLoop int) error {
147139
for i := 0; i < numEventLoop; i++ {
148140
p, err := netpoll.OpenPoller()
149141
if err != nil {
@@ -161,7 +153,7 @@ func (eng *engine) activateReactors(numEventLoop int) error {
161153

162154
// Start sub reactors in background.
163155
eng.eventLoops.iterate(func(_ int, el *eventloop) bool {
164-
eng.workerPool.Go(el.orbit)
156+
eng.concurrency.Go(el.orbit)
165157
return true
166158
})
167159

@@ -183,30 +175,30 @@ func (eng *engine) activateReactors(numEventLoop int) error {
183175
eng.ingress = el
184176

185177
// Start main reactor in background.
186-
eng.workerPool.Go(el.rotate)
178+
eng.concurrency.Go(el.rotate)
187179

188180
// Start the ticker.
189181
if eng.opts.Ticker {
190-
eng.workerPool.Go(func() error {
191-
eng.ingress.ticker(eng.ticker.ctx)
182+
eng.concurrency.Go(func() error {
183+
eng.ingress.ticker(ctx)
192184
return nil
193185
})
194186
}
195187

196188
return nil
197189
}
198190

199-
func (eng *engine) start(numEventLoop int) error {
191+
func (eng *engine) start(ctx context.Context, numEventLoop int) error {
200192
if eng.opts.ReusePort {
201-
return eng.runEventLoops(numEventLoop)
193+
return eng.runEventLoops(ctx, numEventLoop)
202194
}
203195

204-
return eng.activateReactors(numEventLoop)
196+
return eng.activateReactors(ctx, numEventLoop)
205197
}
206198

207-
func (eng *engine) stop(s Engine) {
199+
func (eng *engine) stop(ctx context.Context, s Engine) {
208200
// Wait on a signal for shutdown
209-
<-eng.workerPool.shutdownCtx.Done()
201+
<-ctx.Done()
210202

211203
eng.eventHandler.OnShutdown(s)
212204

@@ -225,20 +217,15 @@ func (eng *engine) stop(s Engine) {
225217
}
226218
}
227219

228-
// Stop the ticker.
229-
if eng.ticker.cancel != nil {
230-
eng.ticker.cancel()
231-
}
232-
233-
if err := eng.workerPool.Wait(); err != nil {
220+
if err := eng.concurrency.Wait(); err != nil {
234221
eng.opts.Logger.Errorf("engine shutdown error: %v", err)
235222
}
236223

237224
// Close all listeners and pollers of event-loops.
238225
eng.closeEventLoops()
239226

240227
// Put the engine into the shutdown state.
241-
atomic.StoreInt32(&eng.inShutdown, 1)
228+
eng.inShutdown.Store(true)
242229
}
243230

244231
func run(eventHandler EventHandler, listeners []*listener, options *Options, addrs []string) error {
@@ -261,17 +248,17 @@ func run(eventHandler EventHandler, listeners []*listener, options *Options, add
261248
for _, ln := range listeners {
262249
lns[ln.fd] = ln
263250
}
264-
shutdownCtx, shutdown := context.WithCancel(context.Background())
251+
rootCtx, shutdown := context.WithCancel(context.Background())
252+
eg, ctx := errgroup.WithContext(rootCtx)
265253
eng := engine{
266-
listeners: lns,
267-
opts: options,
268-
workerPool: struct {
269-
*errgroup.Group
270-
shutdownCtx context.Context
271-
shutdown context.CancelFunc
272-
once sync.Once
273-
}{&errgroup.Group{}, shutdownCtx, shutdown, sync.Once{}},
254+
listeners: lns,
255+
opts: options,
256+
turnOff: shutdown,
274257
eventHandler: eventHandler,
258+
concurrency: struct {
259+
*errgroup.Group
260+
ctx context.Context
261+
}{eg, ctx},
275262
}
276263
switch options.LB {
277264
case RoundRobin:
@@ -282,23 +269,19 @@ func run(eventHandler EventHandler, listeners []*listener, options *Options, add
282269
eng.eventLoops = new(sourceAddrHashLoadBalancer)
283270
}
284271

285-
if eng.opts.Ticker {
286-
eng.ticker.ctx, eng.ticker.cancel = context.WithCancel(context.Background())
287-
}
288-
289272
e := Engine{&eng}
290273
switch eng.eventHandler.OnBoot(e) {
291-
case None:
274+
case None, Close:
292275
case Shutdown:
293276
return nil
294277
}
295278

296-
if err := eng.start(numEventLoop); err != nil {
279+
if err := eng.start(ctx, numEventLoop); err != nil {
297280
eng.closeEventLoops()
298281
eng.opts.Logger.Errorf("gnet engine is stopping with error: %v", err)
299282
return err
300283
}
301-
defer eng.stop(e)
284+
defer eng.stop(rootCtx, e)
302285

303286
for _, addr := range addrs {
304287
allEngines.Store(addr, &eng)

0 commit comments

Comments
 (0)