Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 52 additions & 9 deletions python/cocoindex/subprocess_exec.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from __future__ import annotations

from concurrent.futures import ProcessPoolExecutor
from concurrent.futures.process import BrokenProcessPool
from dataclasses import dataclass, field
from typing import Any, Callable
import pickle
Expand All @@ -19,6 +20,8 @@
import os
import time
from .user_app_loader import load_user_app
from .runtime import execution_context
import logging

WATCHDOG_INTERVAL_SECONDS = 10.0

Expand All @@ -28,6 +31,7 @@
_pool_lock = threading.Lock()
_pool: ProcessPoolExecutor | None = None
_user_apps: list[str] = []
_logger = logging.getLogger(__name__)


def _get_pool() -> ProcessPoolExecutor:
Expand All @@ -48,6 +52,46 @@ def add_user_app(app_target: str) -> None:
_user_apps.append(app_target)


def _restart_pool(old_pool: ProcessPoolExecutor | None = None) -> None:
"""Safely restart the global ProcessPoolExecutor.

Thread-safe via `_pool_lock`. Shuts down the old pool and re-creates a new
one with the same initializer/args.
"""
global _pool
with _pool_lock:
# If another thread already swapped the pool, skip restart
if old_pool is not None and _pool is not old_pool:
return
_logger.error("Detected dead subprocess pool; restarting and retrying.")
prev_pool = _pool
_pool = ProcessPoolExecutor(
max_workers=1,
initializer=_subprocess_init,
initargs=(_user_apps, os.getpid()),
)
if prev_pool is not None:
# Best-effort shutdown of previous pool; letting exceptions bubble up
# is acceptable here and signals irrecoverable executor state.
prev_pool.shutdown(cancel_futures=True)


async def _submit_with_restart(fn: Callable[..., Any], *args: Any) -> Any:
"""Submit and await work, restarting the subprocess until it succeeds.

Retries on BrokenProcessPool or pool-shutdown RuntimeError; re-raises other
exceptions.
"""
while True:
pool = _get_pool()
try:
fut = pool.submit(fn, *args)
return await asyncio.wrap_future(fut)
except BrokenProcessPool:
_restart_pool(old_pool=pool)
# loop and retry


# ---------------------------------------------
# Subprocess: executor registry and helpers
# ---------------------------------------------
Expand Down Expand Up @@ -164,27 +208,26 @@ def __init__(self, executor_factory: type[Any], spec: Any) -> None:
(executor_factory, spec), protocol=pickle.HIGHEST_PROTOCOL
)

# Conditionally expose analyze if underlying class has it (sync-only in caller)
# Conditionally expose analyze if underlying class has it
if hasattr(executor_factory, "analyze"):
# Bind as attribute so getattr(..., "analyze", None) works upstream
def _analyze() -> Any:
fut = self._pool.submit(_sp_analyze, self._key_bytes)
return fut.result()
def analyze() -> Any:
return execution_context.run(
_submit_with_restart(_sp_analyze, self._key_bytes)
)

# Attach method
setattr(self, "analyze", _analyze)
setattr(self, "analyze", analyze)

if hasattr(executor_factory, "prepare"):

async def prepare() -> Any:
fut = self._pool.submit(_sp_prepare, self._key_bytes)
return await asyncio.wrap_future(fut)
return await _submit_with_restart(_sp_prepare, self._key_bytes)

setattr(self, "prepare", prepare)

async def __call__(self, *args: Any, **kwargs: Any) -> Any:
fut = self._pool.submit(_sp_call, self._key_bytes, args, kwargs)
return await asyncio.wrap_future(fut)
return await _submit_with_restart(_sp_call, self._key_bytes, args, kwargs)


def executor_stub(executor_factory: type[Any], spec: Any) -> Any:
Expand Down
Loading