From dadae3c4c383a484853df68bef5e529e0bad22ec Mon Sep 17 00:00:00 2001 From: Grzegorz Bokota Date: Mon, 28 Jul 2025 23:25:39 +0200 Subject: [PATCH] Use multiprocessing pool for paralelism --- sphinx/builders/__init__.py | 50 ++++++++++++++++++------------------- sphinx/util/parallel.py | 6 +---- 2 files changed, 26 insertions(+), 30 deletions(-) diff --git a/sphinx/builders/__init__.py b/sphinx/builders/__init__.py index 2dd972ecfe0..349af220981 100644 --- a/sphinx/builders/__init__.py +++ b/sphinx/builders/__init__.py @@ -7,6 +7,7 @@ import re import time from contextlib import nullcontext +from multiprocessing.pool import Pool from pathlib import Path from typing import TYPE_CHECKING, final @@ -607,25 +608,25 @@ def _read_parallel(self, docnames: list[str], nproc: int) -> None: self.events.emit('env-purge-doc', self.env, docname) self.env.clear_doc(docname) - def read_process(docs: list[str]) -> bytes: + def read_process(docs: list[str]) -> tuple[list[str], bytes]: self.env._app = self._app for docname in docs: self.read_doc(docname, _cache=False) # allow pickling self to send it back - return pickle.dumps(self.env, pickle.HIGHEST_PROTOCOL) + return docs, pickle.dumps(self.env, pickle.HIGHEST_PROTOCOL) def merge(docs: list[str], otherenv: bytes) -> None: env = pickle.loads(otherenv) self.env.merge_info_from(docs, env, self._app) - next(progress) + with Pool(processes=nproc) as pool: + # run read_process() in parallel + results = pool.imap(read_process, chunks) - tasks = ParallelTasks(nproc) - for chunk in chunks: - tasks.add_task(read_process, chunk, merge) + for (docs, bytes), _ in zip(results, progress): + # merge the results back into the main environment + merge(docs, bytes) - # make sure all threads have finished - tasks.join() logger.info('') @final @@ -793,8 +794,17 @@ def write_process(docs: list[tuple[str, nodes.document]]) -> None: firstname, docnames = docnames[0], docnames[1:] _write_docname(firstname, env=self.env, builder=self, tags=self.tags) - tasks = ParallelTasks(nproc) - chunks = make_chunks(docnames, nproc) + + input_data = [] + for docname in docnames: + doctree = self.env.get_and_resolve_doctree( + docname, self, tags=self.tags + ) + self.write_doc_serialized(docname, doctree) + input_data.append((docname, doctree)) + + + chunks = make_chunks(input_data, nproc) # create a status_iterator to step progressbar after writing a document # (see: ``on_chunk_done()`` function) @@ -806,22 +816,12 @@ def write_process(docs: list[tuple[str, nodes.document]]) -> None: self.config.verbosity, ) - def on_chunk_done(args: list[tuple[str, nodes.document]], result: None) -> None: - next(progress) - - self.phase = BuildPhase.RESOLVING - for chunk in chunks: - arg = [] - for docname in chunk: - doctree = self.env.get_and_resolve_doctree( - docname, self, tags=self.tags - ) - self.write_doc_serialized(docname, doctree) - arg.append((docname, doctree)) - tasks.add_task(write_process, arg, on_chunk_done) + with Pool(processes=nproc) as pool: + result = pool.imap(write_process, chunks) + for _ in zip(result, progress): + # just step the progress bar + pass - # make sure all threads have finished - tasks.join() logger.info('') def prepare_writing(self, docnames: Set[str]) -> None: diff --git a/sphinx/util/parallel.py b/sphinx/util/parallel.py index 3dd5e574c58..52edc5512b7 100644 --- a/sphinx/util/parallel.py +++ b/sphinx/util/parallel.py @@ -157,11 +157,7 @@ def make_chunks(arguments: Sequence[str], nproc: int, maxbatch: int = 10) -> lis # determine how many documents to read in one go nargs = len(arguments) chunksize = nargs // nproc - if chunksize >= maxbatch: - # try to improve batch size vs. number of batches - chunksize = int(sqrt(nargs / nproc * maxbatch)) - if chunksize == 0: - chunksize = 1 + chunksize = max(min(chunksize, maxbatch), 1) nchunks, rest = divmod(nargs, chunksize) if rest: nchunks += 1