Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions fastdeploy/engine/common_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@
)
from fastdeploy.inter_communicator.fmq import FMQ
from fastdeploy.metrics.metrics import main_process_metrics
from fastdeploy.metrics.prometheus_multiprocess_setup import (
get_original_prom_dir,
setup_dp_prometheus_dir,
)
from fastdeploy.model_executor.guided_decoding import schema_checker
from fastdeploy.plugins.token_processor import load_token_processor_plugins
from fastdeploy.spec_decode import SpecMethod
Expand Down Expand Up @@ -2720,6 +2724,7 @@ def launch_components(self):
self.launched_expert_service_signal.value[0] = 1
self.dp_processed = []
self.dp_engine_worker_queue_server = []
base_prom_dir = get_original_prom_dir()
for i in range(
1,
self.cfg.parallel_config.data_parallel_size // self.cfg.nnode,
Expand Down Expand Up @@ -2758,10 +2763,15 @@ def launch_components(self):
f"Engine is initialized successfully with {self.cfg.parallel_config.tensor_parallel_size}"
+ f" data parallel id {i}"
)
if envs.FD_ENABLE_INTERNAL_ADAPTER:
setup_dp_prometheus_dir(i, base_prom_dir)
self.dp_processed[-1].start()
while self.launched_expert_service_signal.value[i] == 0:
time.sleep(1)

if envs.FD_ENABLE_INTERNAL_ADAPTER:
setup_dp_prometheus_dir(0, base_prom_dir)

def check_worker_initialize_status(self):
"""
Check the initlialize status of workers by stdout logging
Expand Down
10 changes: 10 additions & 0 deletions fastdeploy/engine/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@
log_request_error,
)
from fastdeploy.metrics.metrics import main_process_metrics
from fastdeploy.metrics.prometheus_multiprocess_setup import (
get_original_prom_dir,
setup_dp_prometheus_dir,
)
from fastdeploy.platforms import current_platform
from fastdeploy.utils import EngineError, console_logger, envs, llm_logger

Expand Down Expand Up @@ -860,6 +864,7 @@ def launch_components(self):
self.launched_expert_service_signal.value[0] = 1
self.dp_processed = []
self.dp_engine_worker_queue_server = []
base_prom_dir = get_original_prom_dir()
for i in range(
1,
self.cfg.parallel_config.data_parallel_size // self.cfg.nnode,
Expand Down Expand Up @@ -897,8 +902,13 @@ def launch_components(self):
f"Engine is initialized successfully with {self.cfg.parallel_config.tensor_parallel_size}"
+ f" data parallel id {i}"
)
if envs.FD_ENABLE_INTERNAL_ADAPTER:
setup_dp_prometheus_dir(i, base_prom_dir)
self.dp_processed[-1].start()

if envs.FD_ENABLE_INTERNAL_ADAPTER:
setup_dp_prometheus_dir(0, base_prom_dir)

for i in range(
1,
self.cfg.parallel_config.data_parallel_size // self.cfg.nnode,
Expand Down
9 changes: 2 additions & 7 deletions fastdeploy/entrypoints/openai/multi_api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import sys
import time

from fastdeploy.metrics.prometheus_multiprocess_setup import setup_dp_prometheus_dir
from fastdeploy.platforms import current_platform
from fastdeploy.utils import find_free_ports, get_logger, is_port_available

Expand Down Expand Up @@ -108,13 +109,7 @@ def start_servers(
env["FD_ENABLE_MULTI_API_SERVER"] = "1"
env["FD_LOG_DIR"] = env.get("FD_LOG_DIR", "log") + f"/log_{i}"
if "PROMETHEUS_MULTIPROC_DIR" in env:
prom_dir = env.get("PROMETHEUS_MULTIPROC_DIR")
prom_dir_i = os.path.join(os.path.dirname(prom_dir), os.path.basename(prom_dir) + f"_dp{i}")
# Create the directory if it doesn't exist
if not os.path.exists(prom_dir_i):
os.makedirs(prom_dir_i, exist_ok=True)
env["PROMETHEUS_MULTIPROC_DIR"] = prom_dir_i
logger.info(f"Set PROMETHEUS_MULTIPROC_DIR for DP {i}: {prom_dir_i}")
setup_dp_prometheus_dir(i, env["PROMETHEUS_MULTIPROC_DIR"], env)

cmd = [
sys.executable,
Expand Down
59 changes: 44 additions & 15 deletions fastdeploy/metrics/prometheus_multiprocess_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,56 @@

from fastdeploy.utils import llm_logger

_original_prom_dir = None


def get_original_prom_dir():
"""Return the PROMETHEUS_MULTIPROC_DIR before any dp suffix was appended."""
return _original_prom_dir


def setup_multiprocess_prometheus():
"""
Cleans and recreates the Prometheus multiprocess directory.
"""
"""Cleans and recreates the Prometheus multiprocess directory."""
global _original_prom_dir

if "PROMETHEUS_MULTIPROC_DIR" not in os.environ:
base_dir = "/tmp/prom_main"
instance_id = str(uuid.uuid4())
prom_dir = f"{base_dir}_{instance_id}"
prom_dir = f"/tmp/prom_main_{uuid.uuid4()}"
if os.path.exists(prom_dir):
shutil.rmtree(prom_dir, ignore_errors=True)
os.makedirs(prom_dir, exist_ok=True)
llm_logger.info(f"PROMETHEUS_MULTIPROC_DIR is set to be {prom_dir}")
os.environ["PROMETHEUS_MULTIPROC_DIR"] = prom_dir
_original_prom_dir = prom_dir
llm_logger.info(f"PROMETHEUS_MULTIPROC_DIR is set to be {prom_dir}")
return prom_dir
else:
prom_dir = os.environ["PROMETHEUS_MULTIPROC_DIR"]
llm_logger.warning(
f"Found PROMETHEUS_MULTIPROC_DIR:{prom_dir} was set by user. "
"you will find inaccurate metrics. Unset the variable "
"will properly handle cleanup."
)
return os.environ["PROMETHEUS_MULTIPROC_DIR"]

user_dir = os.environ["PROMETHEUS_MULTIPROC_DIR"]
_original_prom_dir = user_dir
os.makedirs(user_dir, exist_ok=True)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Bug 这里在用户传入的 PROMETHEUS_MULTIPROC_DIR 上直接 os.makedirs(..., exist_ok=True) 并继续复用目录,但没有清理已有 .db 文件。

get_filtered_metrics() 仍会通过 multiprocess.MultiProcessCollector 读取当前目录里的所有 .db,而后续 setup_dp_prometheus_dir(0, base_dir) 还会把 base 目录下的旧 .db 迁到 dp0/。固定复用同一个目录或服务异常退出后,上一轮进程的 counter/histogram 会被当成本轮 DP0 指标采集,PR 反而会静默产生不准确 metrics;旧代码至少会 warning 这个风险。

建议修复方式:在注册 metrics 前保证本轮使用的 base/dp* 目录没有旧 .db,例如为用户目录再创建带 UUID 的 run 子目录作为 base_dir,或显式删除 base 与对应 dp{i} 子目录中的 .db;如果不打算接管用户目录清理,则保留 warning/校验并拒绝非空目录。

llm_logger.info(f"PROMETHEUS_MULTIPROC_DIR is set to {user_dir}")
return user_dir


def setup_dp_prometheus_dir(dp_id, base_dir, env_dict=None):
"""Set up an isolated PROMETHEUS_MULTIPROC_DIR subdirectory for a DP rank.

For DP0: moves existing .db files from base_dir into dp0/ and updates env.
mmap writes remain valid after rename on the same filesystem.
For DP1+: creates dp{i}/ subdirectory and updates env. Fork triggers PID
change → prometheus_client reset → new .db files in the subdirectory.

Args:
dp_id: Data parallel rank id.
base_dir: Original PROMETHEUS_MULTIPROC_DIR (before any dp suffix).
env_dict: If provided, write to this dict instead of os.environ.
"""
prom_dir_dp = os.path.join(base_dir, f"dp{dp_id}")
os.makedirs(prom_dir_dp, exist_ok=True)
if dp_id == 0 and os.path.isdir(base_dir):
for fname in os.listdir(base_dir):
src = os.path.join(base_dir, fname)
if os.path.isfile(src) and fname.endswith(".db"):
os.rename(src, os.path.join(prom_dir_dp, fname))
llm_logger.info(f"Moved {src} -> {prom_dir_dp}")
target = env_dict if env_dict is not None else os.environ
target["PROMETHEUS_MULTIPROC_DIR"] = prom_dir_dp
llm_logger.info(f"Set PROMETHEUS_MULTIPROC_DIR for DP {dp_id}: {prom_dir_dp}")
2 changes: 1 addition & 1 deletion tests/entrypoints/openai/test_multi_api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ def capture_popen(*args, **kwargs):
for i, prom_dir in enumerate(prom_dirs):
# The directory should contain the server index (0 or 1)
# to uniquely identify each server's metrics directory
self.assertIn(f"_dp{i}", prom_dir, f"PROMETHEUS_MULTIPROC_DIR for server {i} should contain _dp{i}")
self.assertIn(f"/dp{i}", prom_dir, f"PROMETHEUS_MULTIPROC_DIR for server {i} should contain /dp{i}")


if __name__ == "__main__":
Expand Down
8 changes: 2 additions & 6 deletions tests/metrics/test_prometheus_multiprocess_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,11 @@ def test_when_env_var_already_set(self):
test_dir = "/tmp/existing_dir"
os.environ["PROMETHEUS_MULTIPROC_DIR"] = test_dir

with patch("fastdeploy.utils.llm_logger.warning") as mock_logger:
with patch("fastdeploy.utils.llm_logger.info") as mock_logger:
result = setup_multiprocess_prometheus()

assert result == test_dir
mock_logger.assert_called_once_with(
"Found PROMETHEUS_MULTIPROC_DIR:/tmp/existing_dir was set by user. "
"you will find inaccurate metrics. Unset the variable "
"will properly handle cleanup."
)
mock_logger.assert_called_once_with(f"PROMETHEUS_MULTIPROC_DIR is set to {test_dir}")

def test_cleanup_failure_handling(self):
"""测试清理目录失败时的处理"""
Expand Down
Loading