@@ -22,7 +22,6 @@ import (
22
22
"errors"
23
23
"runtime"
24
24
"strings"
25
- "sync"
26
25
"sync/atomic"
27
26
28
27
"golang.org/x/sync/errgroup"
@@ -35,27 +34,22 @@ import (
35
34
)
36
35
37
36
type engine struct {
38
- listeners map [int ]* listener // listeners for accepting incoming connections
39
- opts * Options // options with engine
40
- ingress * eventloop // main event-loop that monitors all listeners
41
- eventLoops loadBalancer // event-loops for handling events
42
- inShutdown int32 // whether the engine is in shutdown
43
- ticker struct {
44
- ctx context.Context // context for ticker
45
- cancel context.CancelFunc // function to stop the ticker
46
- }
47
- workerPool struct {
37
+ listeners map [int ]* listener // listeners for accepting incoming connections
38
+ opts * Options // options with engine
39
+ ingress * eventloop // main event-loop that monitors all listeners
40
+ eventLoops loadBalancer // event-loops for handling events
41
+ inShutdown atomic.Bool // whether the engine is in shutdown
42
+ turnOff context.CancelFunc
43
+ eventHandler EventHandler // user eventHandler
44
+ concurrency struct {
48
45
* errgroup.Group
49
46
50
- shutdownCtx context.Context
51
- shutdown context.CancelFunc
52
- once sync.Once
47
+ ctx context.Context
53
48
}
54
- eventHandler EventHandler // user eventHandler
55
49
}
56
50
57
- func (eng * engine ) isInShutdown () bool {
58
- return atomic . LoadInt32 ( & eng .inShutdown ) == 1
51
+ func (eng * engine ) isShutdown () bool {
52
+ return eng .inShutdown . Load ()
59
53
}
60
54
61
55
// shutdown signals the engine to shut down.
@@ -64,9 +58,7 @@ func (eng *engine) shutdown(err error) {
64
58
eng .opts .Logger .Errorf ("engine is being shutdown with error: %v" , err )
65
59
}
66
60
67
- eng .workerPool .once .Do (func () {
68
- eng .workerPool .shutdown ()
69
- })
61
+ eng .turnOff ()
70
62
}
71
63
72
64
func (eng * engine ) closeEventLoops () {
@@ -88,7 +80,7 @@ func (eng *engine) closeEventLoops() {
88
80
}
89
81
}
90
82
91
- func (eng * engine ) runEventLoops (numEventLoop int ) error {
83
+ func (eng * engine ) runEventLoops (ctx context. Context , numEventLoop int ) error {
92
84
var el0 * eventloop
93
85
lns := eng .listeners
94
86
// Create loops locally and bind the listeners.
@@ -129,21 +121,21 @@ func (eng *engine) runEventLoops(numEventLoop int) error {
129
121
130
122
// Start event-loops in background.
131
123
eng .eventLoops .iterate (func (_ int , el * eventloop ) bool {
132
- eng .workerPool .Go (el .run )
124
+ eng .concurrency .Go (el .run )
133
125
return true
134
126
})
135
127
136
128
if el0 != nil {
137
- eng .workerPool .Go (func () error {
138
- el0 .ticker (eng . ticker . ctx )
129
+ eng .concurrency .Go (func () error {
130
+ el0 .ticker (ctx )
139
131
return nil
140
132
})
141
133
}
142
134
143
135
return nil
144
136
}
145
137
146
- func (eng * engine ) activateReactors (numEventLoop int ) error {
138
+ func (eng * engine ) activateReactors (ctx context. Context , numEventLoop int ) error {
147
139
for i := 0 ; i < numEventLoop ; i ++ {
148
140
p , err := netpoll .OpenPoller ()
149
141
if err != nil {
@@ -161,7 +153,7 @@ func (eng *engine) activateReactors(numEventLoop int) error {
161
153
162
154
// Start sub reactors in background.
163
155
eng .eventLoops .iterate (func (_ int , el * eventloop ) bool {
164
- eng .workerPool .Go (el .orbit )
156
+ eng .concurrency .Go (el .orbit )
165
157
return true
166
158
})
167
159
@@ -183,30 +175,30 @@ func (eng *engine) activateReactors(numEventLoop int) error {
183
175
eng .ingress = el
184
176
185
177
// Start main reactor in background.
186
- eng .workerPool .Go (el .rotate )
178
+ eng .concurrency .Go (el .rotate )
187
179
188
180
// Start the ticker.
189
181
if eng .opts .Ticker {
190
- eng .workerPool .Go (func () error {
191
- eng .ingress .ticker (eng . ticker . ctx )
182
+ eng .concurrency .Go (func () error {
183
+ eng .ingress .ticker (ctx )
192
184
return nil
193
185
})
194
186
}
195
187
196
188
return nil
197
189
}
198
190
199
- func (eng * engine ) start (numEventLoop int ) error {
191
+ func (eng * engine ) start (ctx context. Context , numEventLoop int ) error {
200
192
if eng .opts .ReusePort {
201
- return eng .runEventLoops (numEventLoop )
193
+ return eng .runEventLoops (ctx , numEventLoop )
202
194
}
203
195
204
- return eng .activateReactors (numEventLoop )
196
+ return eng .activateReactors (ctx , numEventLoop )
205
197
}
206
198
207
- func (eng * engine ) stop (s Engine ) {
199
+ func (eng * engine ) stop (ctx context. Context , s Engine ) {
208
200
// Wait on a signal for shutdown
209
- <- eng . workerPool . shutdownCtx .Done ()
201
+ <- ctx .Done ()
210
202
211
203
eng .eventHandler .OnShutdown (s )
212
204
@@ -225,20 +217,15 @@ func (eng *engine) stop(s Engine) {
225
217
}
226
218
}
227
219
228
- // Stop the ticker.
229
- if eng .ticker .cancel != nil {
230
- eng .ticker .cancel ()
231
- }
232
-
233
- if err := eng .workerPool .Wait (); err != nil {
220
+ if err := eng .concurrency .Wait (); err != nil {
234
221
eng .opts .Logger .Errorf ("engine shutdown error: %v" , err )
235
222
}
236
223
237
224
// Close all listeners and pollers of event-loops.
238
225
eng .closeEventLoops ()
239
226
240
227
// Put the engine into the shutdown state.
241
- atomic . StoreInt32 ( & eng .inShutdown , 1 )
228
+ eng .inShutdown . Store ( true )
242
229
}
243
230
244
231
func run (eventHandler EventHandler , listeners []* listener , options * Options , addrs []string ) error {
@@ -261,17 +248,17 @@ func run(eventHandler EventHandler, listeners []*listener, options *Options, add
261
248
for _ , ln := range listeners {
262
249
lns [ln .fd ] = ln
263
250
}
264
- shutdownCtx , shutdown := context .WithCancel (context .Background ())
251
+ rootCtx , shutdown := context .WithCancel (context .Background ())
252
+ eg , ctx := errgroup .WithContext (rootCtx )
265
253
eng := engine {
266
- listeners : lns ,
267
- opts : options ,
268
- workerPool : struct {
269
- * errgroup.Group
270
- shutdownCtx context.Context
271
- shutdown context.CancelFunc
272
- once sync.Once
273
- }{& errgroup.Group {}, shutdownCtx , shutdown , sync.Once {}},
254
+ listeners : lns ,
255
+ opts : options ,
256
+ turnOff : shutdown ,
274
257
eventHandler : eventHandler ,
258
+ concurrency : struct {
259
+ * errgroup.Group
260
+ ctx context.Context
261
+ }{eg , ctx },
275
262
}
276
263
switch options .LB {
277
264
case RoundRobin :
@@ -282,23 +269,19 @@ func run(eventHandler EventHandler, listeners []*listener, options *Options, add
282
269
eng .eventLoops = new (sourceAddrHashLoadBalancer )
283
270
}
284
271
285
- if eng .opts .Ticker {
286
- eng .ticker .ctx , eng .ticker .cancel = context .WithCancel (context .Background ())
287
- }
288
-
289
272
e := Engine {& eng }
290
273
switch eng .eventHandler .OnBoot (e ) {
291
- case None :
274
+ case None , Close :
292
275
case Shutdown :
293
276
return nil
294
277
}
295
278
296
- if err := eng .start (numEventLoop ); err != nil {
279
+ if err := eng .start (ctx , numEventLoop ); err != nil {
297
280
eng .closeEventLoops ()
298
281
eng .opts .Logger .Errorf ("gnet engine is stopping with error: %v" , err )
299
282
return err
300
283
}
301
- defer eng .stop (e )
284
+ defer eng .stop (rootCtx , e )
302
285
303
286
for _ , addr := range addrs {
304
287
allEngines .Store (addr , & eng )
0 commit comments