Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion billiard/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -1659,7 +1659,12 @@ def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool,
debug('helping task handler/workers to finish')
cls._help_stuff_finish(*help_stuff_finish_args)

result_handler.terminate()
# Send the sentinel to the result handler but don't terminate the
# result handler thread. This allows the thread to continue
# processing results in ResultHandler.finish_at_shutdown() until
# the cache is drained, ensuring that all task results are properly
# stored. A call to ResultHandler.terminate() is not necessary here
# because the thread will exit naturally when the cache becomes empty.
cls._set_result_sentinel(outqueue, pool)

if timeout_handler is not None:
Expand Down
52 changes: 52 additions & 0 deletions t/unit/test_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ def get_on_ready_count():
worker = inspect.stack()[1].frame.f_locals['self']
return worker.on_ready_counter.value

def simple_task(x):
return x * 2

class test_pool:
def test_raises(self):
Expand Down Expand Up @@ -56,3 +58,53 @@ def test_on_ready_counter_is_synchronized(self):
pool.close()
pool.join()
pool.terminate()

def test_graceful_shutdown_delivers_results(self):
"""Test that queued results are delivered during pool shutdown.

Specifically, this test verifies that when _terminate_pool() is called,
the ResultHandler.finish_at_shutdown() continues processing results
that workers have placed in the outqueue.
"""

# Create pool with threads=False so that the result handler thread does
# not start and the task results are allowed to build up in the queue.
pool = billiard.pool.Pool(processes=2, threads=False)

# Submit tasks so that results are queued but not processed.
results = [pool.apply_async(simple_task, (i,)) for i in range(8)]

# Allow a small amount of time for tasks to complete.
time.sleep(0.5)

# Close and join the pool to ensure workers stop.
pool.close()
pool.join()

# Call the _terminate_pool() class method to trigger the finish_at_shutdown()
# function that will process results in the queue. Normally _terminate_pool()
# is called by a Finalize object when the Pool object is destroyed. We cannot
# call pool.terminate() here because it will call the Finalize object, which
# won't do anything until the Pool object is destroyed at the end of this test.
# We can simulate the shutdown behaviour by calling _terminate_pool() directly.
billiard.pool.Pool._terminate_pool(
pool._taskqueue,
pool._inqueue,
pool._outqueue,
pool._pool,
pool._worker_handler,
pool._task_handler,
pool._result_handler,
pool._cache,
pool._timeout_handler,
pool._help_stuff_finish_args()
)

# Cancel the Finalize object to prevent _terminate_pool() from being called
# a second time when the Pool object is destroyed.
pool._terminate.cancel()

# Verify that all results were delivered by finish_at_shutdown() and can be
# retrieved.
for i, result in enumerate(results):
assert result.get() == i * 2
Loading