Skip to content

Server Pool

Module: lean_interact.pool

This module provides the LeanServerPool class, which manages a pool of AutoLeanServer instances sharing a common ReplaySessionCache. This allows efficient distribution of Lean commands across multiple persistent REPL processes.

LeanServerPool

LeanServerPool(
    config: LeanREPLConfig,
    num_workers: int | None = None,
    max_total_memory: float = 0.8,
    max_process_memory: float | None = 0.8,
    max_restart_attempts: int = 5,
)

A pool of AutoLeanServer instances sharing a ReplaySessionCache.

Initialize the Lean server pool.

Parameters:

Name Type Description Default
config LeanREPLConfig

The configuration for the Lean servers.

required
num_workers int | None

The number of workers to start. Defaults to cpu_count() - 1.

None
max_total_memory float

Passed to AutoLeanServer.

0.8
max_process_memory float | None

Passed to AutoLeanServer.

0.8
max_restart_attempts int

Passed to AutoLeanServer.

5
Source code in src/lean_interact/pool.py
def __init__(
    self,
    config: LeanREPLConfig,
    num_workers: int | None = None,
    max_total_memory: float = 0.8,
    max_process_memory: float | None = 0.8,
    max_restart_attempts: int = 5,
):
    """
    Initialize the Lean server pool.

    Args:
        config: The configuration for the Lean servers.
        num_workers: The number of workers to start. Defaults to `cpu_count() - 1`.
        max_total_memory: Passed to `AutoLeanServer`.
        max_process_memory: Passed to `AutoLeanServer`.
        max_restart_attempts: Passed to `AutoLeanServer`.
    """
    self.config = config
    self.num_workers = num_workers or max(1, mp.cpu_count() - 1)
    self.session_cache = ReplaySessionCache(lazy=True)

    self._lock = threading.Lock()
    self._workers: list[AutoLeanServer] = []
    self._free_workers: list[AutoLeanServer] = []
    self._workers_cond = threading.Condition(self._lock)
    self._async_cond: asyncio.Condition | None = None  # Lazy init

    def _create_server(_: int) -> AutoLeanServer:
        return AutoLeanServer(
            config=config,
            max_total_memory=max_total_memory,
            max_process_memory=max_process_memory,
            max_restart_attempts=max_restart_attempts,
            session_cache=self.session_cache,
        )

    with ThreadPoolExecutor(max_workers=self.num_workers) as executor:
        self._workers = list(executor.map(_create_server, range(self.num_workers)))

    self._free_workers = list(self._workers)

config instance-attribute

config = config

num_workers instance-attribute

num_workers = num_workers or max(1, cpu_count() - 1)

session_cache instance-attribute

session_cache = ReplaySessionCache(lazy=True)

close

close() -> None

Close all workers and the session cache.

Source code in src/lean_interact/pool.py
def close(self) -> None:
    """Close all workers and the session cache."""
    with self._lock:
        # We work on a copy to avoid concurrent modification issues if any
        workers_to_kill = list(self._workers)
        self._workers.clear()
        self._free_workers.clear()

    def _kill_worker(w: AutoLeanServer) -> None:
        try:
            w.kill()
        except Exception:
            pass

    if workers_to_kill:
        with ThreadPoolExecutor(max_workers=len(workers_to_kill)) as executor:
            executor.map(_kill_worker, workers_to_kill)

run

run(
    request: BaseREPLQuery,
    *,
    verbose: bool = False,
    timeout: float | None = DEFAULT_TIMEOUT,
) -> BaseREPLResponse | LeanError

Run a command on an available worker.

Source code in src/lean_interact/pool.py
def run(
    self,
    request: BaseREPLQuery,
    *,
    verbose: bool = False,
    timeout: float | None = DEFAULT_TIMEOUT,
) -> BaseREPLResponse | LeanError:
    """
    Run a command on an available worker.
    """
    # Extract desired env/proofState to optimize worker selection
    req_id: int | None = None
    if isinstance(request, Command):
        req_id = request.env
    elif isinstance(request, ProofStep):
        req_id = request.proof_state

    worker = self._acquire_worker_sync(req_id)
    try:
        return worker.run(request, verbose=verbose, timeout=timeout, add_to_session_cache=True)  # type: ignore
    finally:
        self._release_worker_sync(worker)

async_run async

async_run(
    request: BaseREPLQuery,
    *,
    verbose: bool = False,
    timeout: float | None = DEFAULT_TIMEOUT,
) -> BaseREPLResponse | LeanError

Run a command asynchronously on an available worker.

Source code in src/lean_interact/pool.py
async def async_run(
    self,
    request: BaseREPLQuery,
    *,
    verbose: bool = False,
    timeout: float | None = DEFAULT_TIMEOUT,
) -> BaseREPLResponse | LeanError:
    """
    Run a command asynchronously on an available worker.
    """
    req_id: int | None = None
    if isinstance(request, Command):
        req_id = request.env
    elif isinstance(request, ProofStep):
        req_id = request.proof_state

    worker = await self._acquire_worker_async(req_id)
    try:
        return await worker.async_run(request, verbose=verbose, timeout=timeout, add_to_session_cache=True)  # type: ignore
    finally:
        await self._release_worker_async(worker)

run_batch

run_batch(
    requests: list[BaseREPLQuery],
    *,
    verbose: bool = False,
    timeout_per_cmd: float | None = DEFAULT_TIMEOUT,
    show_progress: bool = False,
) -> list[BaseREPLResponse | LeanError | Exception]

Run a batch of commands on available workers.

Source code in src/lean_interact/pool.py
def run_batch(
    self,
    requests: list[BaseREPLQuery],
    *,
    verbose: bool = False,
    timeout_per_cmd: float | None = DEFAULT_TIMEOUT,
    show_progress: bool = False,
) -> list[BaseREPLResponse | LeanError | Exception]:
    """
    Run a batch of commands on available workers.
    """
    with ThreadPoolExecutor(max_workers=self.num_workers) as executor:
        futures = [executor.submit(self.run, req, verbose=verbose, timeout=timeout_per_cmd) for req in requests]  # type: ignore
        results = [None] * len(futures)
        if show_progress:
            from tqdm import tqdm  # type: ignore

            for future in tqdm(as_completed(futures), total=len(futures), desc="Running batch"):
                index = futures.index(future)
                try:
                    results[index] = future.result()
                except Exception as e:
                    results[index] = e
        else:
            for future in as_completed(futures):
                index = futures.index(future)
                try:
                    results[index] = future.result()
                except Exception as e:
                    results[index] = e
        return results  # type: ignore

async_run_batch async

async_run_batch(
    requests: list[BaseREPLQuery],
    *,
    verbose: bool = False,
    timeout_per_cmd: float | None = DEFAULT_TIMEOUT,
    show_progress: bool = False,
) -> list[BaseREPLResponse | LeanError | Exception]

Run a batch of commands asynchronously on available workers.

Source code in src/lean_interact/pool.py
async def async_run_batch(
    self,
    requests: list[BaseREPLQuery],
    *,
    verbose: bool = False,
    timeout_per_cmd: float | None = DEFAULT_TIMEOUT,
    show_progress: bool = False,
) -> list[BaseREPLResponse | LeanError | Exception]:
    """
    Run a batch of commands asynchronously on available workers.
    """
    tasks = [self.async_run(req, verbose=verbose, timeout=timeout_per_cmd) for req in requests]  # type: ignore
    futures = [asyncio.create_task(task) for task in tasks]
    results = [None] * len(futures)
    if show_progress:
        from tqdm.asyncio import tqdm  # type: ignore

        for future in tqdm(asyncio.as_completed(futures), total=len(futures), desc="Running batch"):
            index = futures.index(future)
            try:
                results[index] = await future
            except Exception as e:
                results[index] = e
    else:
        for future in asyncio.as_completed(futures):
            index = futures.index(future)
            try:
                results[index] = await future
            except Exception as e:
                results[index] = e
    return results  # type: ignore