30
30
import java .util .concurrent .CompletableFuture ;
31
31
import java .util .concurrent .CompletionStage ;
32
32
import java .util .concurrent .ConcurrentHashMap ;
33
+ import java .util .concurrent .atomic .AtomicBoolean ;
33
34
34
- import java .util .concurrent .atomic .AtomicReference ;
35
35
import org .apache .commons .logging .Log ;
36
36
import org .apache .commons .logging .LogFactory ;
37
37
import org .apache .commons .pool2 .impl .GenericObjectPool ;
38
38
import org .apache .commons .pool2 .impl .GenericObjectPoolConfig ;
39
+
39
40
import org .springframework .beans .factory .DisposableBean ;
40
- import org .springframework .context .SmartLifecycle ;
41
41
import org .springframework .data .redis .connection .PoolException ;
42
42
import org .springframework .util .Assert ;
43
43
62
62
* @since 2.0
63
63
* @see #getConnection(Class)
64
64
*/
65
- class LettucePoolingConnectionProvider implements LettuceConnectionProvider , RedisClientProvider , DisposableBean ,
66
- SmartLifecycle {
65
+ class LettucePoolingConnectionProvider implements LettuceConnectionProvider , RedisClientProvider , DisposableBean {
67
66
68
67
private static final Log log = LogFactory .getLog (LettucePoolingConnectionProvider .class );
69
68
70
- private final AtomicReference < State > state = new AtomicReference <>( State . CREATED );
69
+ private final AtomicBoolean disposed = new AtomicBoolean ( );
71
70
private final LettuceConnectionProvider connectionProvider ;
72
71
private final GenericObjectPoolConfig <StatefulConnection <?, ?>> poolConfig ;
73
72
private final Map <StatefulConnection <?, ?>, GenericObjectPool <StatefulConnection <?, ?>>> poolRef = new ConcurrentHashMap <>(
@@ -81,10 +80,6 @@ class LettucePoolingConnectionProvider implements LettuceConnectionProvider, Red
81
80
private final Map <Class <?>, AsyncPool <StatefulConnection <?, ?>>> asyncPools = new ConcurrentHashMap <>(32 );
82
81
private final BoundedPoolConfig asyncPoolConfig ;
83
82
84
- enum State {
85
- CREATED , STARTING , STARTED , STOPPING , STOPPED , DESTROYED ;
86
- }
87
-
88
83
LettucePoolingConnectionProvider (LettuceConnectionProvider connectionProvider ,
89
84
LettucePoolingClientConfiguration clientConfiguration ) {
90
85
@@ -215,51 +210,43 @@ public CompletableFuture<Void> releaseAsync(StatefulConnection<?, ?> connection)
215
210
216
211
@ Override
217
212
public void destroy () throws Exception {
218
- stop ();
219
- state .set (State .DESTROYED );
220
- }
221
-
222
213
223
- @ Override
224
- public void start () {
225
- state .set (State .STARTED );
226
- }
214
+ if (!disposed .compareAndSet (false , true )) {
215
+ return ;
216
+ }
227
217
228
- @ Override
229
- public void stop () {
230
- if (state .compareAndSet (State .STARTED , State .STOPPING )) {
231
- List <CompletableFuture <?>> futures = new ArrayList <>();
232
- if (!poolRef .isEmpty () || !asyncPoolRef .isEmpty ()) {
233
- log .warn ("LettucePoolingConnectionProvider contains unreleased connections" );
234
- }
218
+ List <CompletableFuture <?>> futures = new ArrayList <>();
219
+ if (!poolRef .isEmpty () || !asyncPoolRef .isEmpty ()) {
220
+ log .warn ("LettucePoolingConnectionProvider contains unreleased connections" );
221
+ }
235
222
236
- if (!inProgressAsyncPoolRef .isEmpty ()) {
223
+ if (!inProgressAsyncPoolRef .isEmpty ()) {
237
224
238
- log .warn ("LettucePoolingConnectionProvider has active connection retrievals" );
239
- inProgressAsyncPoolRef .forEach ((k , v ) -> futures .add (k .thenApply (StatefulConnection ::closeAsync )));
240
- }
225
+ log .warn ("LettucePoolingConnectionProvider has active connection retrievals" );
226
+ inProgressAsyncPoolRef .forEach ((k , v ) -> futures .add (k .thenApply (StatefulConnection ::closeAsync )));
227
+ }
241
228
242
- if (!poolRef .isEmpty ()) {
229
+ if (!poolRef .isEmpty ()) {
243
230
244
- poolRef .forEach ((connection , pool ) -> pool .returnObject (connection ));
245
- poolRef .clear ();
246
- }
231
+ poolRef .forEach ((connection , pool ) -> pool .returnObject (connection ));
232
+ poolRef .clear ();
233
+ }
247
234
248
- if (!asyncPoolRef .isEmpty ()) {
235
+ if (!asyncPoolRef .isEmpty ()) {
249
236
250
- asyncPoolRef .forEach ((connection , pool ) -> futures .add (pool .release (connection )));
251
- asyncPoolRef .clear ();
252
- }
237
+ asyncPoolRef .forEach ((connection , pool ) -> futures .add (pool .release (connection )));
238
+ asyncPoolRef .clear ();
239
+ }
253
240
254
- pools .forEach ((type , pool ) -> pool .close ());
241
+ pools .forEach ((type , pool ) -> pool .close ());
255
242
256
- CompletableFuture
243
+ CompletableFuture
257
244
.allOf (futures .stream ().map (it -> it .exceptionally (LettuceFutureUtils .ignoreErrors ()))
258
- .toArray (CompletableFuture []::new )) //
245
+ .toArray (CompletableFuture []::new )) //
259
246
.thenCompose (ignored -> {
260
247
261
248
CompletableFuture [] poolClose = asyncPools .values ().stream ().map (AsyncPool ::closeAsync )
262
- .map (it -> it .exceptionally (LettuceFutureUtils .ignoreErrors ())).toArray (CompletableFuture []::new );
249
+ .map (it -> it .exceptionally (LettuceFutureUtils .ignoreErrors ())).toArray (CompletableFuture []::new );
263
250
264
251
return CompletableFuture .allOf (poolClose );
265
252
}) //
@@ -269,18 +256,7 @@ public void stop() {
269
256
}) //
270
257
.join ();
271
258
272
- pools .clear ();
273
- }
274
- state .set (State .STOPPED );
275
- }
276
-
277
- @ Override
278
- public boolean isRunning () {
279
- return State .STARTED .equals (this .state .get ());
259
+ pools .clear ();
280
260
}
281
261
282
- @ Override
283
- public boolean isAutoStartup () {
284
- return true ;
285
- }
286
262
}
0 commit comments