Source code for maxent_grpo.training.runtime.ops.vllm_startup
"""Detect and classify vLLM server startup stalls from log text."""
from __future__ import annotations
from argparse import ArgumentParser
from enum import Enum
from pathlib import Path
[docs]
class StartupStatus(str, Enum):
"""High-level startup state derived from vLLM log lines."""
STARTING = "starting"
HEALTHY = "healthy"
CORE_ENGINE_STALL = "core_engine_stall"
ERROR = "error"
_READY_MARKERS = (
"Application startup complete.",
"Uvicorn running on",
)
_CORE_STALL_MARKER = "Waiting for 1 local, 0 remote core engine proc(s) to start."
_ENGINE_READY_HINTS = (
"Started server process",
"Waiting for application startup.",
"Model loading took",
)
_POST_INIT_STALL_HINTS = (
"GPU KV cache size:",
"Maximum concurrency for",
)
_ERROR_MARKERS = (
"Traceback (most recent call last):",
"RuntimeError:",
"ERROR ",
)
[docs]
def classify_vllm_startup_log(log_text: str, stall_threshold: int = 3) -> StartupStatus:
"""Classify startup progress using marker patterns in ``log_text``."""
if any(marker in log_text for marker in _READY_MARKERS):
return StartupStatus.HEALTHY
if any(marker in log_text for marker in _ERROR_MARKERS):
return StartupStatus.ERROR
stall_count = log_text.count(_CORE_STALL_MARKER)
has_boot_hints = all(marker in log_text for marker in _ENGINE_READY_HINTS)
if stall_count >= stall_threshold and has_boot_hints:
return StartupStatus.CORE_ENGINE_STALL
# vLLM 0.8.x can hang after cache/profile init without emitting the
# repeated "core engine proc(s)" line; treat that signature as stalled.
if has_boot_hints and all(marker in log_text for marker in _POST_INIT_STALL_HINTS):
return StartupStatus.CORE_ENGINE_STALL
return StartupStatus.STARTING
[docs]
def should_trigger_v0_fallback(
log_text: str,
attempt: int,
min_attempts: int = 20,
stall_threshold: int = 3,
) -> bool:
"""Return True when vLLM startup appears stuck and should be relaunched in V0 mode."""
if attempt < min_attempts:
return False
status = classify_vllm_startup_log(log_text, stall_threshold=stall_threshold)
return status is StartupStatus.CORE_ENGINE_STALL
def _build_parser() -> ArgumentParser:
parser = ArgumentParser(
description="Classify vLLM startup health from log content."
)
parser.add_argument(
"--log", type=Path, required=True, help="Path to the vLLM log file."
)
parser.add_argument(
"--attempt", type=int, default=0, help="Current health-check attempt index."
)
parser.add_argument(
"--min-attempts",
type=int,
default=20,
help="Minimum attempt before core-stall fallback is allowed.",
)
parser.add_argument(
"--stall-threshold",
type=int,
default=3,
help="Minimum repeated core-stall lines required for classification.",
)
return parser
[docs]
def main() -> int:
args = _build_parser().parse_args()
text = (
args.log.read_text(encoding="utf-8", errors="ignore")
if args.log.exists()
else ""
)
if should_trigger_v0_fallback(
text,
attempt=args.attempt,
min_attempts=args.min_attempts,
stall_threshold=args.stall_threshold,
):
print("fallback_v0")
return 0
status = classify_vllm_startup_log(text, stall_threshold=args.stall_threshold)
print(status.value)
return 0
if __name__ == "__main__":
raise SystemExit(main())