Skip to content

Commit 8603172

Browse files
weetsterNusnus
andauthored
Ensure that task results are delivered during pool shutdown (#435)
* Do not terminate ResultHandler thread so that task results can be reported properly * Test case to ensure that results are delivered during pool shutdown * Added newline at EOF --------- Co-authored-by: Tomer Nosrati <[email protected]>
1 parent 849a3e8 commit 8603172

File tree

2 files changed

+58
-1
lines changed

2 files changed

+58
-1
lines changed

billiard/pool.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1659,7 +1659,12 @@ def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool,
16591659
debug('helping task handler/workers to finish')
16601660
cls._help_stuff_finish(*help_stuff_finish_args)
16611661

1662-
result_handler.terminate()
1662+
# Send the sentinel to the result handler but don't terminate the
1663+
# result handler thread. This allows the thread to continue
1664+
# processing results in ResultHandler.finish_at_shutdown() until
1665+
# the cache is drained, ensuring that all task results are properly
1666+
# stored. A call to ResultHandler.terminate() is not necessary here
1667+
# because the thread will exit naturally when the cache becomes empty.
16631668
cls._set_result_sentinel(outqueue, pool)
16641669

16651670
if timeout_handler is not None:

t/unit/test_pool.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ def get_on_ready_count():
1515
worker = inspect.stack()[1].frame.f_locals['self']
1616
return worker.on_ready_counter.value
1717

18+
def simple_task(x):
19+
return x * 2
1820

1921
class test_pool:
2022
def test_raises(self):
@@ -56,3 +58,53 @@ def test_on_ready_counter_is_synchronized(self):
5658
pool.close()
5759
pool.join()
5860
pool.terminate()
61+
62+
def test_graceful_shutdown_delivers_results(self):
63+
"""Test that queued results are delivered during pool shutdown.
64+
65+
Specifically, this test verifies that when _terminate_pool() is called,
66+
the ResultHandler.finish_at_shutdown() continues processing results
67+
that workers have placed in the outqueue.
68+
"""
69+
70+
# Create pool with threads=False so that the result handler thread does
71+
# not start and the task results are allowed to build up in the queue.
72+
pool = billiard.pool.Pool(processes=2, threads=False)
73+
74+
# Submit tasks so that results are queued but not processed.
75+
results = [pool.apply_async(simple_task, (i,)) for i in range(8)]
76+
77+
# Allow a small amount of time for tasks to complete.
78+
time.sleep(0.5)
79+
80+
# Close and join the pool to ensure workers stop.
81+
pool.close()
82+
pool.join()
83+
84+
# Call the _terminate_pool() class method to trigger the finish_at_shutdown()
85+
# function that will process results in the queue. Normally _terminate_pool()
86+
# is called by a Finalize object when the Pool object is destroyed. We cannot
87+
# call pool.terminate() here because it will call the Finalize object, which
88+
# won't do anything until the Pool object is destroyed at the end of this test.
89+
# We can simulate the shutdown behaviour by calling _terminate_pool() directly.
90+
billiard.pool.Pool._terminate_pool(
91+
pool._taskqueue,
92+
pool._inqueue,
93+
pool._outqueue,
94+
pool._pool,
95+
pool._worker_handler,
96+
pool._task_handler,
97+
pool._result_handler,
98+
pool._cache,
99+
pool._timeout_handler,
100+
pool._help_stuff_finish_args()
101+
)
102+
103+
# Cancel the Finalize object to prevent _terminate_pool() from being called
104+
# a second time when the Pool object is destroyed.
105+
pool._terminate.cancel()
106+
107+
# Verify that all results were delivered by finish_at_shutdown() and can be
108+
# retrieved.
109+
for i, result in enumerate(results):
110+
assert result.get() == i * 2

0 commit comments

Comments
 (0)