Module: lean_interact.pool
This module provides the LeanServerPool class, which manages a pool of AutoLeanServer instances
sharing a common ReplaySessionCache. This allows efficient distribution of Lean commands across
multiple persistent REPL processes.
LeanServerPool
LeanServerPool(
config: LeanREPLConfig,
num_workers: int | None = None,
max_total_memory: float = 0.8,
max_process_memory: float | None = 0.8,
max_restart_attempts: int = 5,
)
A pool of AutoLeanServer instances sharing a ReplaySessionCache.
Initialize the Lean server pool.
Parameters:
| Name |
Type |
Description |
Default |
config
|
LeanREPLConfig
|
The configuration for the Lean servers.
|
required
|
num_workers
|
int | None
|
The number of workers to start. Defaults to cpu_count() - 1.
|
None
|
max_total_memory
|
float
|
Passed to AutoLeanServer.
|
0.8
|
max_process_memory
|
float | None
|
Passed to AutoLeanServer.
|
0.8
|
max_restart_attempts
|
int
|
Passed to AutoLeanServer.
|
5
|
Source code in src/lean_interact/pool.py
| def __init__(
self,
config: LeanREPLConfig,
num_workers: int | None = None,
max_total_memory: float = 0.8,
max_process_memory: float | None = 0.8,
max_restart_attempts: int = 5,
):
"""
Initialize the Lean server pool.
Args:
config: The configuration for the Lean servers.
num_workers: The number of workers to start. Defaults to `cpu_count() - 1`.
max_total_memory: Passed to `AutoLeanServer`.
max_process_memory: Passed to `AutoLeanServer`.
max_restart_attempts: Passed to `AutoLeanServer`.
"""
self.config = config
self.num_workers = num_workers or max(1, mp.cpu_count() - 1)
self.session_cache = ReplaySessionCache(lazy=True)
self._lock = threading.Lock()
self._workers: list[AutoLeanServer] = []
self._free_workers: list[AutoLeanServer] = []
self._workers_cond = threading.Condition(self._lock)
self._async_cond: asyncio.Condition | None = None # Lazy init
def _create_server(_: int) -> AutoLeanServer:
return AutoLeanServer(
config=config,
max_total_memory=max_total_memory,
max_process_memory=max_process_memory,
max_restart_attempts=max_restart_attempts,
session_cache=self.session_cache,
)
with ThreadPoolExecutor(max_workers=self.num_workers) as executor:
self._workers = list(executor.map(_create_server, range(self.num_workers)))
self._free_workers = list(self._workers)
|
config
instance-attribute
num_workers
instance-attribute
num_workers = num_workers or max(1, cpu_count() - 1)
session_cache
instance-attribute
close
Close all workers and the session cache.
Source code in src/lean_interact/pool.py
| def close(self) -> None:
"""Close all workers and the session cache."""
with self._lock:
# We work on a copy to avoid concurrent modification issues if any
workers_to_kill = list(self._workers)
self._workers.clear()
self._free_workers.clear()
def _kill_worker(w: AutoLeanServer) -> None:
try:
w.kill()
except Exception:
pass
if workers_to_kill:
with ThreadPoolExecutor(max_workers=len(workers_to_kill)) as executor:
executor.map(_kill_worker, workers_to_kill)
|
run
run(
request: BaseREPLQuery,
*,
verbose: bool = False,
timeout: float | None = DEFAULT_TIMEOUT,
) -> BaseREPLResponse | LeanError
Run a command on an available worker.
Source code in src/lean_interact/pool.py
| def run(
self,
request: BaseREPLQuery,
*,
verbose: bool = False,
timeout: float | None = DEFAULT_TIMEOUT,
) -> BaseREPLResponse | LeanError:
"""
Run a command on an available worker.
"""
# Extract desired env/proofState to optimize worker selection
req_id: int | None = None
if isinstance(request, Command):
req_id = request.env
elif isinstance(request, ProofStep):
req_id = request.proof_state
worker = self._acquire_worker_sync(req_id)
try:
return worker.run(request, verbose=verbose, timeout=timeout, add_to_session_cache=True) # type: ignore
finally:
self._release_worker_sync(worker)
|
async_run
async
async_run(
request: BaseREPLQuery,
*,
verbose: bool = False,
timeout: float | None = DEFAULT_TIMEOUT,
) -> BaseREPLResponse | LeanError
Run a command asynchronously on an available worker.
Source code in src/lean_interact/pool.py
| async def async_run(
self,
request: BaseREPLQuery,
*,
verbose: bool = False,
timeout: float | None = DEFAULT_TIMEOUT,
) -> BaseREPLResponse | LeanError:
"""
Run a command asynchronously on an available worker.
"""
req_id: int | None = None
if isinstance(request, Command):
req_id = request.env
elif isinstance(request, ProofStep):
req_id = request.proof_state
worker = await self._acquire_worker_async(req_id)
try:
return await worker.async_run(request, verbose=verbose, timeout=timeout, add_to_session_cache=True) # type: ignore
finally:
await self._release_worker_async(worker)
|
run_batch
run_batch(
requests: list[BaseREPLQuery],
*,
verbose: bool = False,
timeout_per_cmd: float | None = DEFAULT_TIMEOUT,
show_progress: bool = False,
) -> list[BaseREPLResponse | LeanError | Exception]
Run a batch of commands on available workers.
Source code in src/lean_interact/pool.py
| def run_batch(
self,
requests: list[BaseREPLQuery],
*,
verbose: bool = False,
timeout_per_cmd: float | None = DEFAULT_TIMEOUT,
show_progress: bool = False,
) -> list[BaseREPLResponse | LeanError | Exception]:
"""
Run a batch of commands on available workers.
"""
with ThreadPoolExecutor(max_workers=self.num_workers) as executor:
futures = [executor.submit(self.run, req, verbose=verbose, timeout=timeout_per_cmd) for req in requests] # type: ignore
results = [None] * len(futures)
if show_progress:
from tqdm import tqdm # type: ignore
for future in tqdm(as_completed(futures), total=len(futures), desc="Running batch"):
index = futures.index(future)
try:
results[index] = future.result()
except Exception as e:
results[index] = e
else:
for future in as_completed(futures):
index = futures.index(future)
try:
results[index] = future.result()
except Exception as e:
results[index] = e
return results # type: ignore
|
async_run_batch
async
async_run_batch(
requests: list[BaseREPLQuery],
*,
verbose: bool = False,
timeout_per_cmd: float | None = DEFAULT_TIMEOUT,
show_progress: bool = False,
) -> list[BaseREPLResponse | LeanError | Exception]
Run a batch of commands asynchronously on available workers.
Source code in src/lean_interact/pool.py
| async def async_run_batch(
self,
requests: list[BaseREPLQuery],
*,
verbose: bool = False,
timeout_per_cmd: float | None = DEFAULT_TIMEOUT,
show_progress: bool = False,
) -> list[BaseREPLResponse | LeanError | Exception]:
"""
Run a batch of commands asynchronously on available workers.
"""
tasks = [self.async_run(req, verbose=verbose, timeout=timeout_per_cmd) for req in requests] # type: ignore
futures = [asyncio.create_task(task) for task in tasks]
results = [None] * len(futures)
if show_progress:
from tqdm.asyncio import tqdm # type: ignore
for future in tqdm(asyncio.as_completed(futures), total=len(futures), desc="Running batch"):
index = futures.index(future)
try:
results[index] = await future
except Exception as e:
results[index] = e
else:
for future in asyncio.as_completed(futures):
index = futures.index(future)
try:
results[index] = await future
except Exception as e:
results[index] = e
return results # type: ignore
|