@@ -348,7 +348,7 @@ async def subscription_handler(msg):
348
348
349
349
async def subscription_handler2 (msg ):
350
350
msgs2 .append (msg )
351
- if len (msgs2 ) >= 1 :
351
+ if len (msgs2 ) >= 1 and not fut . done () :
352
352
fut .set_result (True )
353
353
354
354
await nc .connect (no_echo = True )
@@ -1274,8 +1274,6 @@ async def err_cb(e):
1274
1274
1275
1275
@async_test
1276
1276
async def test_closing_tasks (self ):
1277
- nc = NATS ()
1278
-
1279
1277
disconnected_count = 0
1280
1278
errors = []
1281
1279
closed_future = asyncio .Future ()
@@ -1298,12 +1296,11 @@ async def err_cb(e):
1298
1296
'disconnected_cb' : disconnected_cb ,
1299
1297
'error_cb' : err_cb ,
1300
1298
'closed_cb' : closed_cb ,
1301
- 'servers' : [
"nats://foo:[email protected] :4223" ],
1302
1299
'max_reconnect_attempts' : 3 ,
1303
1300
'dont_randomize' : True ,
1304
1301
}
1305
1302
1306
- await nc .connect (** options )
1303
+ nc = await nats .
connect (
"nats://foo:[email protected] :4223" , ** options )
1307
1304
self .assertTrue (nc .is_connected )
1308
1305
1309
1306
# Do a sudden close and wrap up test.
@@ -1316,7 +1313,7 @@ async def err_cb(e):
1316
1313
for task in asyncio .all_tasks ():
1317
1314
if not task .done ():
1318
1315
pending_tasks_count += 1
1319
- self .assertEqual ( expected_tasks , pending_tasks_count )
1316
+ self .assertTrue ( pending_tasks_count <= expected_tasks )
1320
1317
1321
1318
@async_test
1322
1319
async def test_pending_data_size_flush_reconnect (self ):
@@ -2666,6 +2663,11 @@ async def test_protocol_mixing(self):
2666
2663
2667
2664
@async_test
2668
2665
async def test_drain_cancelled_errors_raised (self ):
2666
+ try :
2667
+ from unittest .mock import AsyncMock
2668
+ except ImportError :
2669
+ pytest .skip ("skip since cannot use AsyncMock" )
2670
+
2669
2671
nc = NATS ()
2670
2672
await nc .connect ()
2671
2673
0 commit comments